Rob Ares

Safe Shutdown Of Eventmachine Reactors

Recently I found myself trying to devise a way to prevent a process from shutting down until an EventMachine Reactor Loop was finished performing some network IO. I found a particular pattern quite effective and wanted to it share here.

In-case you haven't done much work with EventMachine, it provides a mixin (called a Deferrable) that allows the developer to model their activity in the reactor loop as callbacks (success and failure). Using this pattern, we can create a class to register pieces of work and unregister them when the work is complete. In this way, we can make assertions on whether or not it is safe to shut down a process or not depending or not if we have work left to complete.

The exact situation I am trying to solve for is the following: I have a client that is pulling messages off of a queue and sending messages to another system (via another protocol). At any given time, we will have N number of IO jobs active in the event loop. But what happens when we send -TERM to this process? EventMachine starts closing these open connections and network calls that we want to succeed will fail. What I wanted was a way to register individual pieces of work (pulling the message off the queue and pushing to the remote service) and check to see if all the work is done (thus safe to allow the process to exit).

So consider the following class:

class DeferrableProgressMonitor
  attr_reader :total

  def initialize
    @marked, @total = [], 0

  def register(deferrable)
    @total += 1
    @marked << deferrable

    deferrable.callback { |object| @marked.pop }
    deferrable.errback { |e, object| @marked.pop }

  alias :<< :register

  def current

  def wait?
    @total > 0 && !@marked.empty?

Here is a small piece of code that uses it: do
  monitor =
  trap("TERM") do
    unless monitor.wait?

  monitor << thing_that_returns_a_deferrable

So the signal trap will make a call to check to see if there is any outstanding work left. if there is, we keep on going (my actual code sets a timer that continually keeps checking #wait?). Otherwise, we shut the loop down.

The are a few gotchas here:

Share this: