Key Takeaways
- Forking in Ruby, though relatively expensive, can compete with threading if the Ruby implementation doesn’t squander Copy-on-Write optimization. However, data produced by a method called in a fork will not be available to the parent process due to process isolation.
- Pipes, UNIX-Sockets, and Distributed Ruby are methods for interprocess communication (IPC) in Ruby. Pipes allow data to flow in one direction between a pair of file descriptors, while UNIX-Sockets are bidirectional and can also use datagrams. Distributed Ruby allows for the execution of code remotely by sending messages to distributed objects.
- To move objects between processes, they need to be serialized into a byte format. Ruby’s Marshal can serialize most Ruby objects, allowing them to be transferred across byte-transmission mechanisms like pipes or sockets.
- The Forkable module allows for the execution of a method in another process and returns the result back in the original process, enabling asynchronous method execution. The Cod library simplifies IPC in Ruby by using higher-level IPC mechanisms it calls channels, which can transmit Ruby objects.
- Forking servers generally use pre-forking, creating a pool of processes that wait for connections. The Parallel gem provides the ability to achieve parallelism in CRuby without getting into the specifics of forks and IPC. Thinking in terms of processes sending messages allows for more granular scaling.
In the first article, we examined why the fork()
system call is useful and where it fits into the grand scheme of things. We saw that by passing a block to Kernel#fork
or Process#fork
it is possible to execute arbitrary code concurrently (or in parallel if there are multiple processors). In addition, we saw that although forking is relatively expensive, it can compete with threading if the Ruby implementation doesn’t squander Copy-on-Write optimization.
Unfortunately, if a method is called in a fork, any data that it produces will not be available to the parent process, due to process isolation. What we need is a conduit through which we can pass data between processes. Here, in Part II, we’re going to cover some interprocess communication mechanisms as well as more ways that you can utilize fork
.
The code in this article is available at github.
Pipes
Pipes enable data to flow in one direction between a pair of file descriptors. Since forks inherit open file descriptors, pipes can be used to communicate data between parent and child processes. Creating a pipe in Ruby is easy with IO#pipe.
>> reader, writer = IO.pipe
=> [#<IO:fd 5>, #<IO:fd 6>]
Here, writer will only write, and reader will only read. Sounds simple, but unfortunately there are some nuances. If you want to use a pipe more than once, IO#puts
complemented by IO#gets
is the simplest way,
>> writer.puts("hello world")
>> reader.gets
=> "hello world\n"
>> writer.puts("hello world, again")
>> reader.gets
=> "hello world, again\n"
Pipes communicate data using byte streams, so they need delimiters to know when to stop reading data. IO#gets
reads until it receives a “\n”. Unlike IO#puts
, IO#write
does not automatically append “\n” to data, so a IO#gets
followed by a IO#write
will block indefinitely.
>>writer.write("this string is terminated\n")
>>reader.gets
=> "this string is terminated\n"
>> writer.write("this string is not terminated")
>> reader.gets
*waits indefinitely for \n*
Unlike IO#gets
, IO#read
will wait indefinitely for EOF. IO objects signal EOF when closed, so you will only use IO#read
for one-time writes.
>>reader,writer = IO.pipe
>>writer.write("hello world")
>>writer.close
>>reader.read
=> "hello world"
>>reader.read
=> ""
Ok, so how would we connect a pipe between a parent process and one of its children? Thanks to the fact that variables and file descriptors will be shared, we only need to create a pipe once. However, since the fork will copy both ends of the pipe (reader and writer), we need to pick two extra ends and close them. Here is how you would send data from a child to parent.
child_parent_pipe.rb
# creates a fork and pipes a string from child to parent
reader,writer = IO.pipe
fork do
reader.close
writer.puts "sent from child process"
end
writer.close
from_child = reader.gets
puts from_child
We can also make it work the other way. Just make sure to close the writer on the end that will read and the reader on the end that will write.
parent_child_pipe.rb
# creates a fork and pipes a string from parent to child
reader,writer = IO.pipe
fork do
writer.close
from_parent = reader.gets
puts from_parent
end
reader.close
writer.write "sent from parent process"
UNIX-Sockets
You can think of UNIX domain sockets as pipes with two big advantages:
- UNIX sockets are bidirectional whereas pipes are unidirectional
- Pipes can only use byte steams, but UNIX sockets can also use datagrams
Unlike regular sockets, they can only communicate data between two points on the same machine, using the filesystem as their address space. As a result, they can be very lightweight, making them a good fit for interprocess communication.
unix_sockets.rb
# creates a pair of UNIX sockets that send and receive a string
require 'socket'
parent_socket, child_socket = UNIXSocket.pair
fork do
parent_socket.close
child_socket.send("sent from child (#{$$})", 0)
from_parent = child_socket.recv(100)
puts from_parent
end
child_socket.close
parent_socket.send("sent from parent (#{$$})", 0)
from_child = parent_socket.recv(100)
puts from_child
Distributed Ruby
Both pipes and UNIX sockets have a couple of downsides:
- They are low-level, byte-transmission mechanisms. For complex behavior you will need to implement existing protocols or define your own.
- They cannot communicate across the machine barrier, limiting their usability in massively-scalable scenarios.
Distributed Ruby lets you create and consume what might be called “distributed object services.” Theses services let you execute code remotely by sending messages to distributed objects.
We execute code on remote machines all the time, but we do it through quite a bit of indirection. For example, let’s say you go to a web address like http://searchforallthestuffs.com/search?q=ponies. Your request hits a router which sees the route you specified and passes the arguments to the corresponding controller, which then executes the code associated with that route (generate an html view, JSON, XML, etc).
If your goal is to execute code remotely, this…kinda sucks. Every time you add a new feature, you need a route in place and perhaps a new controller. That’s a lot of overhead just to add a method to a class.
Distributed objects let you execute code remotely, but with an object receiving the message rather than an address.
distributed.rb
# Creates several worker processes and concurrently waits for the fastest one
require 'drb'
NUM_WORKERS = 4
class Worker
def calculate
time_to_work = rand(1..7)
sleep time_to_work
return time_to_work
end
def stop
DRb.stop_service
end
end
# Start object services
NUM_WORKERS.times do |i|
DRb.start_service("druby://:700#{i}", Worker.new)
puts "Worker running at #{DRb.uri}"
end
# Create a local end-point for each service
workers = NUM_WORKERS.times.map { |i| DRbObject.new nil, "druby://:700#{i}" }
# Concurrently wait for the fastest calculation
thread_pool = []
NUM_WORKERS.times do |i|
thread_pool << Thread.new do
answer = workers[i].calculate
puts "Worker #{i} finished in #{answer} seconds"
end
end
# Wait for every thread to get its answer
thread_pool.each(&:join)
# Shut down each worker
workers.each { |w| w.stop }
Moving Objects between Processes
Since low-level communication constructs like pipes and sockets transfer bytes, not objects, you will need to encode your objects into a byte format — i.e. serialize them — in order to move them across the process barrier.
Fortunately, Ruby ships with Marshal
which can serialize most Ruby objects.
From Ruby-Doc.org: “Some objects cannot be dumped: if the objects to be dumped include bindings, procedure or method objects, instances of class IO, or singleton objects, a TypeError will be raised.”
serialization.rb
# Ruby objects can be serialized by Marshal to move across pipes
# or sockets
Tire = Struct.new(:radius, :pressure)
reader, writer = IO.pipe
fork do
reader.close
tire = Tire.new(7, 28)
tire_data = Marshal.dump(tire)
writer.write tire_data
end
writer.close
tire_data = reader.gets
tire = Marshal.load(tire_data)
puts tire.inspect
Creating a Module for Asynchronous Method Calls
If you can move objects between process, then you can execute a method in another process and get the result back in the original. This effectively enables asynchronous method execution.
Here, I have a module called Forkable that gives a class the ability to execute methods in parallel. The only thing that is different from before is that the pipe is read from in a new thread, and what comes out of the pipe is yielded to the block.
forkable.rb
# A module that lets you fork a method and get its result in a block
# Notes:
# 1. Forked methods can't take blocks in this implementation
# 2. Call back threads will be destroyed if the main process exits
# 3. Not production ready. For educational purposes only.
####################################################################
# If included in a class #fork_method will run a method in another process
# and yield the result to a block
module Forkable
def fork_method(method, *args)
reader, writer = IO.pipe
fork do
reader.close
result = self.send(method, *args)
child_data = Marshal.dump(result)
writer.puts(child_data)
end
writer.close
Thread.new do
data_from_child = reader.gets
yield Marshal.load(data_from_child) if block_given?
end
end
end
# An object that takes a random amount of time to instantiate
class ExpensiveObject
attr_reader :expense
def initialize(max_expense)
@expense = rand(1..max_expense)
sleep @expense
end
end
# A worker that forks, a...forker
class Forker
include Forkable
def calculate(max_expense)
return ExpensiveObject.new(max_expense)
end
end
# Create 3 ExpensiveObjects and print how long they took to create
f = Forker.new
3.times do
f.fork_method(:calculate, 7) do |result|
puts "result: #{result.inspect}"
end
end
puts "waiting on results..."
Process.waitall
puts "main process finished"
Cod
Cod is a library that aims to make IPC easier in Ruby. In the previous section, I demonstrated how to serialize objects for transfer across byte-transmission mechanisms like pipes or sockets. Cod takes care of this for us. It uses higher-level IPC mechanisms it calls channels. Not only do channels not need to be closed on one end in each process, but they can also transmit Ruby objects. A channel is created in Cod (however oddly) with Cod#pipe
.
cod.rb
# The cod gem simplifies IPC. Ruby objects can be sent across channels.
# Installation:
# $ gem install cod
require 'cod'
Tire = Struct.new(:radius, :pressure)
channel = Cod.pipe
3.times do
fork do
radius = rand(8..14)
pressure = rand(24..33)
channel.put Tire.new(radius, pressure)
end
end
tires = 3.times.map { channel.get }
puts tires.inspect
The cod gem does more than I can cover here (for example, it has beanstalkd integration). The website has a lot of examples, so definitely check it out.
Pre-forking Servers
Let’s look at a common example of forking in practice: servers. Some servers that use forking to achieve parallelism include:
- Apache (Prefork MultiProcessing Module)
- Unicorn
- Rainbows! (based on Unicorn)
Specifically, forking servers generally use pre-forking. The concept works like this: forking a process might be cheaper than copying one, but it’s not free. Since running a web server generally consists of opening and closing lots of short-lived connections, it might not be a good idea to wait to create a new child process until a new connection is accepted. Instead, we can go ahead and fork a few times and let the forks wait for connections. You will often see this described as running a “pool of processes.”
So what might that look like?
preforking.rb
# Starts an echo server that can service 3 clients in parallel
require 'socket'
server = TCPServer.new 'localhost', 3000
trap("EXIT") { socket.close }
trap("INT") { exit }
3.times do
fork do
sock = server.accept
sock.puts "You are connected to process #{$$}:"
while recv = sock.gets
sock.puts("ECHO:> #{recv}")
end
end
end
Process.waitall
If you run this code, you should be able to connect on 3 separate terminals.
telnet localhost 3000
The Parallel Gem
The parallel gem provides the ability to achieve parallelism in CRuby (MRI) without getting into the nitty-gritty of forks and IPC. Here is a quick way to spool up all of your CPU cores with parallel.
parallel.rb
# Parallel iterates across collections with processes or threads
# Installation:
# $ gem install parallel
require 'parallel'
def calculate(magnitude)
x = 0
cycles = 10 ** magnitude
cycles.times do
x += 1.000001
end
return x
end
results = Parallel.map([6, 6, 6, 6, 7, 7, 7, 7]) do |mag|
calculate(mag)
end
puts results
SitePoint already has a great article about the parallel gem that explores its inner workings (at least as of 2011).
Programmers often avoid looking at source code out of fear of it turning into a massive time sink. The parallel gem, however, is a fairly small, one file library. There’s a great opportunity to learn and contribute.
Conclusion
Even as other interpreters (JRuby, Rubinius) operate freely without a Global Interpreter Lock, a process-centric approach to concurrency is still useful. The more vertical an approach to scaling is, the more limited it will be. Running multiple threads on multiple cores might be more horizontal than increasing clock speed, but it is still more vertical than running multiple machines. Running 100 single core instances is more granular than running 25 quad core instances. Thinking in terms of processes sending messages enables you to embrace that kind of granularity. It won’t be easy, but once it works with a cluster of 10 instances, why not 100? Or 1000? Why not have 500 on one provider and 500 on another? They’re just processes sending messages.
Spending a bunch of money up front on resources you may or may not use represents the old way of thinking. Why buy more than exactly what you need?
Frequently Asked Questions (FAQs) about Forking and IPC in Ruby
What is the main difference between forking and threading in Ruby?
Forking and threading are both methods of achieving concurrency in Ruby, but they work in fundamentally different ways. Forking creates a new process that is a copy of the current one, with its own memory space. This means that changes made in the child process do not affect the parent process. Threading, on the other hand, creates multiple threads of execution within the same process, sharing the same memory space. This means that changes made in one thread can affect other threads. While forking can provide more isolation and can take advantage of multiple processors, it is also more resource-intensive than threading.
How can I communicate between parent and child processes in Ruby?
Interprocess communication (IPC) in Ruby can be achieved through several methods, including pipes, sockets, and shared memory. Pipes are a simple and common method of IPC, allowing data to be written to one end and read from the other. Sockets provide a more flexible method of IPC, allowing communication between processes on different machines. Shared memory allows for direct access to a common memory area, but requires careful synchronization to avoid conflicts.
What are the potential pitfalls of using forking in Ruby?
While forking can provide a powerful method of achieving concurrency in Ruby, it also comes with its own set of challenges. One of the main pitfalls is the potential for ‘zombie’ processes. These are child processes that have finished execution but have not been properly reaped by the parent process. If left unchecked, these can consume system resources and lead to performance issues. Another potential pitfall is the overhead associated with creating a new process, which can be significant if a large number of processes are being created.
How can I avoid ‘zombie’ processes when using forking in Ruby?
Zombie’ processes can be avoided by ensuring that the parent process properly reaps its child processes once they have finished execution. This can be done using the Process.wait
method, which waits for a child process to exit and then reaps it. If you have multiple child processes, you can use Process.waitall
to wait for and reap all child processes.
How can I use sockets for IPC in Ruby?
Sockets provide a flexible method of IPC in Ruby, allowing for communication between processes on different machines. To use sockets, you first need to create a socket using the Socket.new
method. You can then use the connect
method to connect to a remote socket, and the bind
and listen
methods to set up a local socket to accept connections. Data can be sent and received using the send
and recv
methods.
How can I use shared memory for IPC in Ruby?
Shared memory is a method of IPC that allows for direct access to a common memory area. In Ruby, shared memory can be achieved using the mmap
library, which provides a Mmap
class that can be used to create a shared memory object. This object can then be read from and written to like a normal array, but the data is stored in a shared memory area that can be accessed by other processes.
How can I handle errors when using forking in Ruby?
Error handling when using forking in Ruby can be achieved using the begin
and rescue
keywords. You can wrap your forking code in a begin
block, and then use a rescue
block to catch and handle any errors that occur. This allows you to gracefully handle errors and prevent them from crashing your program.
How can I use pipes for IPC in Ruby?
Pipes are a simple and common method of IPC in Ruby. To use pipes, you can use the IO.pipe
method to create a pair of pipe endpoints. Data can be written to one end of the pipe using the write
method, and read from the other end using the read
method.
How can I use threads for concurrency in Ruby?
Threads provide a lightweight method of achieving concurrency in Ruby. To create a new thread, you can use the Thread.new
method, passing in a block of code to be executed in the thread. You can then use the join
method to wait for the thread to finish execution. It’s important to note that due to Ruby’s Global Interpreter Lock (GIL), threads in Ruby do not truly run in parallel, but rather interleave their execution.
How can I synchronize access to shared resources in Ruby?
Synchronization in Ruby can be achieved using several methods, including mutexes, condition variables, and queues. Mutexes provide a simple method of ensuring that only one thread can access a shared resource at a time. Condition variables can be used to signal between threads, allowing one thread to wait for a condition to be met by another thread. Queues provide a thread-safe way of passing data between threads.
Robert is a voracious reader, Ruby aficionado, and other big words. He is currently looking for interesting projects to work on and can be found at his website.