The Stages Gem

Thursday, January 19, 2012 by Nathan Acuff
If you have a lot of data, eventually, you will probably want to do something with it.  All data processing and analysis tasks start off with the best of intentions, but from the day a processing task is written, it begins to rot.  These sorts of jobs tend to decay into unmaintainable slop at a rate even more quickly than normal code, and it's easy to see why:
  • They tend to not be mission critical
  • They tend to have little to no test coverage
  • They tend to be modified with a 'quick and dirty' mindset
In an effort to clean things up, iGoDigital has produced a gem to help us straighten out some of our own analysis tasks.  Stages is designed to help you write simple, reusable stream processors and wire them together with intuitive syntax.  It started as a straight implementation of Dave Thomas's code and has changed only a bit.  We aim to turn this:

data_chunks.each |chunk|
  chunk_array = get_you_some_more_data
  chunk_array.map!{|x| format_for_client(x)}
  chunk_array.map!{|x| maybe_more_formatting(x)} if client.extra_work?
  chunk_array.each do |item|
    maybe_write_or_whatever(item)
  end
end

Into this:

get_chunks | fill_more_data | (client.extra_work? ? extra_format : format) | write
Stages lets you build a pipeline of Stage objects, which functions as a chain of iterators.  The left hand side of a pipeline should output your source data.  If you already have your source data as an enumerator, you can use the Each stage to feed it out.  Otherwise, you will want to write a custom source Stage, which will override the process method, like this:

class Evens < Stage
  def process
    value = 0
    loop do
      output value
      value += 2
    end
  end
end

This will emit even numbers.  Infinite loops like this one are perfectly fine, as long as you call output every once in a while.
Other stages will override the handle_value method, which handles a single input value and then calls output for the next stage.  For convience, we have defined two general purpose processing stages, Map and Select.  Both take blocks and do basically what you'd expect:

Evens.new | Map.new{|x| x * 2} # outputs 0 4 8 12 ...
Evens.new | Select.new{|x| x % 2 == 0} # outputs 0 4 8 12 ...

For most tasks, we recommend implementing your own stages, so that you can get more code reuse, but we have also provided a few more stages for common stream processing tasks:
Count will run its ancestor to exhaustion and produce a hash of {value => number of occurances}.  Be careful using this when previous stages are not limited (like our Evens example).
Emit will simply emit the value it is initialized with once and only once, useful for testing and for wrapping sub-pipelines.
Each has several modes of operation.  If it is initialized with a value, it will run the value through its block (if any), and then call .each on the result, outputting each value.  If not, it will run its block (if any) on each input, and then call .each on the result.  For example:

Each.new('foo'){|x| x.chars} # 'f' 'o' 'o'
Each.new(['foo', 'bar']) | Each.new{|x| x.chars} # 'f' 'o' 'o' 'b' 'a' 'r'
Wrap creates a sub-pipeline.  Let's say you have some data which maps to a larger but related set, or you just want to keep a part of your pipeline isolated.  Wrap will pass its inputs to the child pipeline and output, by default, a hash of {input_value => [pipeline results]}.  If you prefer, you can just get the results as an array, or return each result (though in this case, the value of the wrap is limited).  If you call wrap with :aggregated, it will assume that the sub pipeline aggregates on its own (for example, with the Count stage), and just use the first element in the results array.

Similar to, but even more confusing than Wrap are Restrict and Resume.  These stages are roughly the equivalent to wrapping all the stages between them, but without the subpipeline semantics.  Restrict holds a value and runs everything until it finds a resume, which aggregates the results, joins them to the held value, and then re-initializes all the stages between them.

A few examples using Wrap and Restrict/Resume are in the examples directory on our github repository.
That's Stages in a nutshell.  Grab the gem (gem install stages), check it out on github, and let us know what you're doing with it!

Comments for The Stages Gem

Leave a comment





Captcha