#367 Celluloid pro
- Download:
- source codeProject Files in Zip (1.17 KB)
- mp4Full Size H.264 Video (26 MB)
- m4vSmaller H.264 Video (12.7 MB)
- webmFull Size VP8 Video (14.2 MB)
- ogvFull Size Theora Video (25.4 MB)
If you ever need to work with concurrency in Ruby it’s worth taking a look at Celluloid which puts an object-oriented spin on multi-threaded development. With it we don’t need to worry about managing threads and mutexes but can instead focus on the objects themselves.
Launching Rockets With Celluloid
Celluloid comes as a gem and is installed in the usual way.
$ gem install celluloid
We’ll demonstrate Celluloid in a simple Ruby script that simulates a rocket launch. Note that this code sleeps for a second during each step of the countdown.
class Rocket def launch 3.downto(1) do |n| puts "#{n}..." sleep 1 end puts "Blast off!" end end
We’ll run this code in irb
to see what it does.
>> Rocket.new.launch 3... 2... 1... Blast off! => nil >>
This works as we expect but we don’t get the prompt back until the rocket has launched. Launching multiple rockets at the same time isn’t really possible as each one will count down separately. We could use threads to give us the ability to launch more than one rocket at once but instead we’ll use Celluloid. To do this we require celluloid
in our file and then into any class that we want to add concurrency to, like this:
require 'celluloid' class Rocket include Celluloid def launch 3.downto(1) do |n| puts "#{n}..." sleep 1 end puts "Blast off!" end end
If we run this code now it behaves just like it did before so what does Celluloid give us? Well, now we can call any method in our class with an exclamation mark to have Celluloid trigger it asynchronously. If we launch a rocket with launch!
we get the prompt back almost instantly but the rocket still counts down and blasts off.
>> r = Rocket.new => #<Celluloid::Actor(Rocket:0x3fea5193ba90)> >> r.launch! => nil 3... >> 2... 1... Blast off!
The way this works is that each instance of a class that includes Celluloid
exists in its own thread. If we list out the threads in irb with Thread.list.count
we’ll see that we have three threads. When we make a new rocket this creates a new thread.
>> Thread.list.count => 3 >> r2 = Rocket.new => #<Celluloid::Actor(Rocket:0x3fea5194296c)> >> Thread.list.count => 4
We can now launch multiple rockets asynchronously without any problems.
>> r.launch!; r2.launch! => nil 3...>> 3... 2... 2... 1... 1... Blast off! Blast off!
When we inspect a rocket object the output looks different from what we expect.
?> r.inspect => "#<Celluloid::Actor(Rocket:0x3fea5193ba90)>"
What we have is an instance of an Actor
proxy which delegates its method calls to the Rocket
instance. An Actor
has the ability to be dead or alive and we can check this by calling alive?
on it. To kill an actor we call terminate
on it. A dead actor cannot receive any calls so if we call launch!
on a dead rocket this will raise an exception. An actor can also die if it raises an exception. To demonstrate this we’ll raise an exception in the Rocket class’s launch!
method.
require 'celluloid' class Rocket include Celluloid def launch 3.downto(1) do |n| puts "#{n}..." sleep 1 end puts "Blast off!" end end
If we create a new Rocket
now and call launch!
on it that exception will be raised and the actor will be dead.
>> r = Rocket.new => #<Celluloid::Actor(Rocket:0x3ff8a4468afc)> >> r.launch! E, [2012-07-30T19:31:55.650324 #81385] ERROR -- : Rocket crashed! RuntimeError: Houston, we have a problem. # Exception code omitted. => nil >> r.alive? => false
When an exception is raised asynchronously like this it won’t interrupt the main thread or any of the other actors, these will happily continue as if nothing had gone wrong. Only the one actor is affected and marked as dead. If we want other actors to die when one does Celluloid supports linking actors together and if any linked actor dies, all the actors it’s linked to die with it. To demonstrate this we’ll create a new Launcher class that also includes Celluloid
and which can launch rockets.
class Launcher include Celluloid def launch_rocket Rocket.new_link.launch! end end
We want to link a rocket with its launcher and we do this by calling new_link
rather than new
when we launch a rocket. If we open up irb now and create a new Launcher we get an exception when we try to launch a rocket.
>> l = Launcher.new => #<Celluloid::Actor(Launcher:0x3fd4a9928480)> >> l.launch_rocket => >> E, [2012-07-30T20:41:56.454178 #81712] ERROR -- : Rocket crashed! RuntimeError: Houston, we have a problem.
Both the Rocket
and the Launcher
will be dead now as they were linked. We don’t want our launcher to die but we can use this behaviour to recover gracefully when a rocket dies by calling trap_exit
and passing a method that is triggered when a linked actor dies.
class Launcher include Celluloid trap_exit :recover def launch_rocket Rocket.new_link.launch! end def recover(actor, reason) puts "Recovering" end end
This method is passed an actor and a failure reason, which in this case will be the exception. When we try launching a rocket through our launcher now it will recover gracefully so that our launcher will still be alive after its rocket crashes and dies. This is useful when we’re building fault-tolerant systems and we can simulate this in our code by altering our Rocket
class so that if only has a chance of failing during its countdown.
def launch 3.downto(1) do |n| puts "#{n}..." sleep 1 raise "Houston, we have a problem." if [true, false].sample end puts "Blast off!" end
Instead of simply recovering in the launcher we’ll launch a new rocket instead.
class Launcher include Celluloid trap_exit :relaunch def launch_rocket Rocket.new_link.launch! end def relaunch(actor, reason) launch_rocket end end
Now our launcher will retry until a rocket is successfully launched.
>> l = Launcher.new => #<Celluloid::Actor(Launcher:0x3fe69c8a0328)> >> l.launch_rocket 3... => nil >> E, [2012-07-30T21:24:03.504697 #81972] ERROR -- : Rocket crashed! RuntimeError: Houston, we have a problem. /Users/eifion/code/rocket.rb:10:in `block in launch' /Users/eifion/code/rocket.rb:7:in `downto' /Users/eifion/code/rocket.rb:7:in `launch' /Users/eifion/.rvm/gems/ruby-1.9.3-p194/gems/celluloid-0.11.1/lib/celluloid/calls.rb:99:in `dispatch' /Users/eifion/.rvm/gems/ruby-1.9.3-p194/gems/celluloid-0.11.1/lib/celluloid/actor.rb:240:in `block in handle_message' /Users/eifion/.rvm/gems/ruby-1.9.3-p194/gems/celluloid-0.11.1/lib/celluloid/task.rb:45:in `block in initialize' 3... 2... 1... Blast off!
If we take a look at our thread count we’ll see that there are only four threads even though we’ve created far more rockets. This is because Celluloid doesn’t let a dead actor hang around using up a thread. If we create a large number of rockets at the same time, say 100, these will all be live actors and each takes up a separate thread.
?> Thread.list.size => 4 >> 100.times { Rocket.new } => 100 >> Thread.list.size => 104
This is something that we’ll need to keep in mind. Once we’ve finished with a Celluloid object we should terminate it manually instead of relying on garbage collection.
Supervisors
Getting back to fault-tolerant systems Celluloid offers something called Supervisors which will automatically restart an actor if it dies. To demonstrate these we’ll give a rocket the ability to automatically launch when it’s instantiated by adding an initialize method with an autolaunch argument.
def initialize(autolaunch = false) launch! if autolaunch end
We can now create a new Rocket and tell it to autolaunch. Of course this might crash so instead of just calling new we’ll call supervise
with the same arguments. When we do this Celluloid will set up a supervisor which will restart it if it crashes.
>> Rocket.supervise(true)
When we run this a couple of rockets crash but eventually one launches successfully. This supervisor has essentially duplicated the behaviour of our launcher, but this is built into Celluloid.
A Different Use For Celluloid
We’ll shift gears now and focus on an example that’s a little more practical than launching rockets. Instead we’ll look at a feed counting script.
require 'rss' require 'open-uri' class FeedCounter def initialize(url) @url = url end def count open(@url) do |f| rss = RSS::Parser.parse(f.read, false) count = rss.items.size puts "#{count} in #{@url}" count end end end counts = $*.map { |url| FeedCounter.new(url).count } total = counts.inject(:+) puts "#{total} total" if total
The file takes a list of RSS feed URLs and for each one makes a new FeedCounter
instance, passing in that URL then calling count on it to count the number of items in the feed. This count method parses the feed, counts the number of items in it and prints the count out. Finally the scripts sums up the counts from each feed and prints out the total. We’ll run this script now and pass in a list of RSS feed URLs.
$ ruby feed_counter.rb http://rss.cnn.com/rss/cnn_topstories.rss http://feeds.feedburner.com/railscasts http://stackoverflow.com/feeds 56 in http://rss.cnn.com/rss/cnn_topstories.rss 328 in http://feeds.feedburner.com/railscasts 30 in http://stackoverflow.com/feeds 414 total
The script fetches each feed one at a time but it would perform much better if it fetched the feeds concurrently so we’ll use Celluloid to do this. We’ll require Celluloid in the file and include it in the FeedCounter
class.
require 'rss' require 'open-uri' require 'celluloid' class FeedCounter include Celluloid def initialize(url) @url = url end def count open(@url) do |f| rss = RSS::Parser.parse(f.read, false) count = rss.items.size puts "#{count} in #{@url}" count end end end counts = $*.map { |url| FeedCounter.new(url).count } total = counts.inject(:+) puts "#{total} total" if total
To make the call to count asynchronous we can add an exclamation mark to it. Doing this will make it return nil
but we need to return the number of feeds so that we can add it up for the total. Celluloid has a feature called Futures that can help here and we’ll demonstrate how it works in irb.
>> f = FeedCounter.new("http://feeds.feedburner.com/railscasts") => #<Celluloid::Actor(FeedCounter:0x3fd83219a218) @url="http://feeds.feedburner.com/railscasts"> >> future = f.future(:count) => #<Celluloid::Future:0x007fb06432b948> >> 328 in http://feeds.feedburner.com/railscasts ?> future.value => 328
First we make a new FeedCounter
object with a given URL. We then call future
on this object, which is a method that Celluloid provides, and pass it the name of the method that we want to trigger, in this case count
. This returns a Future
object and triggers count asynchronously which we see the output from. We can call value on this object to get the value that the method returned.
This means that our script can do something else while it is parsing an RSS feed and fetch the count value at a later time. If we call future.value
before the asynchronous method has completed it will wait until it completes so that it can return the value. We’ll make the changes necessary to add concurrency to our script now. Instead of calling count directly we’ll use future
to call it asynchronously. This will return an array of Future
objects which we’ll grab the values from to calculate the total.
require 'rss' require 'open-uri' require 'celluloid' class FeedCounter include Celluloid def initialize(url) @url = url end def count open(@url) do |f| rss = RSS::Parser.parse(f.read, false) count = rss.items.size puts "#{count} in #{@url}" count end end end futures = $*.map { |url| FeedCounter.new(url).future(:count) } total = futures.map(&:value).inject(:+) puts "#{total} total" if total
Our script will now run much more quickly as it fetches each feed asynchronously.
$ ruby feed_counter.rb http://rss.cnn.com/rss/cnn_topstories.rss http://feeds.feedburner.com/railscasts http://stackoverflow.com/feeds 30 in http://stackoverflow.com/feeds 57 in http://rss.cnn.com/rss/cnn_topstories.rss 328 in http://feeds.feedburner.com/railscasts 415 total I, [2012-07-31T20:09:23.962723 #87003] INFO -- : Terminating 4 actors... I, [2012-07-31T20:09:23.963616 #87003] INFO -- : Shutdown completed cleanly
Pools
If we passed this script hundreds of feeds it would currently try to download and parse them all at the same time which isn’t the best thing to do. Celluloid allows us to set up a pool of actors. By using this we can throw as much work as we want at it and it will only process a job when an actor is available to do so. This is also fault-tolerant so that if an actor crashes a new one will automatically be started up. For this to work we need to rearrange our code a little. Instead of initializing a FeedCounter with a URL we’ll pass it in to the count method so that it becomes a local variable instead of an instance variable. This way all feed counters will be the same so now pass each URL into the call to future. We can now set up a pool of feed counters by calling FeedCounter.pool and instead of calling future on a FeedCounter object we can call it on the pool.
require 'rss' require 'open-uri' require 'celluloid' class FeedCounter include Celluloid def initialize end def count(url) open(url) do |f| rss = RSS::Parser.parse(f.read, false) count = rss.items.size puts "#{count} in #{@url}" count end end end pool = FeedCounter.pool(size:6) futures = $*.map { |url| pool.new.future(:count, url) } total = futures.map(&:value).inject(:+) puts "#{total} total" if total
The number of workers in a pool defaults to the number of cores on our machine, but we can pass a size option to override this, as we’ve done here. If we run this script now and pass it a large number of feeds it will only process up to six at a time.
Celluloid is an impressive solution for handling concurrency. We haven’t had to change much in our script to make it concurrent and our script would be much more complex if we had to deal with threads and mutexes directly. There’s much more to it than we’ve covered here and it’s worth reading through the wiki to see what’s available. One feature we haven’t covered is Timers3 which give us the ability to trigger code in the future. The other pages that are worth reading are the ones on Frequently Asked Questions, Blocks, Gotchas and Thread Safety Notes. For a real-life example of Celluloid in action take a look a Sidekiq which was covered in the previous episode, specifically the manager, fetcher and processor classes. Each of these uses Celluloid.