class Cascading::SubAssembly

Allows you to plugin c.p.SubAssemblies to a cascading.jruby Assembly.

Assumptions:

This is a low-level tool, so be careful.

Attributes

assembly[R]
scope[R]
sub_assembly[R]
tail_pipe[R]

Public Instance Methods

finalize(pipes, incoming_scopes) click to toggle source

Complete the addition of the SubAssembly to the Assembly. Propagates Scope through the SubAssembly and updates the #tail_pipe of the SubAssembly for passing back to the enclosing Assembly. May accept many incoming pipes, but typically only recieves the #tail_pipe of the enclosing Assembly.

# File lib/cascading/sub_assembly.rb, line 32
def finalize(pipes, incoming_scopes)
  # Build adjacency list for sub_assembly
  graph = {}
  adjacency_list(pipes, sub_assembly.tails.first, graph)

  # Group adjacency list by next_pipe
  incoming_edges = graph.inject({}) do |incoming_edges, (prev_pipe, next_pipe)|
    incoming_edges[next_pipe] ||= []
    incoming_edges[next_pipe] << prev_pipe
    incoming_edges
  end

  # Propagate scope through sub_assembly graph
  inputs = Hash[*pipes.zip(incoming_scopes).flatten]
  while !incoming_edges.empty?
    incoming_edges.each do |next_pipe, prev_pipes|
      if (prev_pipes - inputs.keys).empty?
        input_scopes = prev_pipes.inject([]) do |input_scopes, prev_pipe|
          input_scopes << inputs.delete(prev_pipe)
          input_scopes
        end
        inputs[next_pipe] = Scope.outgoing_scope(next_pipe, input_scopes)
        incoming_edges.delete(next_pipe)
      end
    end
  end

  raise "Incoming edges did not capture all inputs; #{inputs.size} remaining" unless inputs.size == 1
  @tail_pipe, @scope = inputs.first
  raise "Expected scope propagation to end with tail pipe; ended with '#{@tail_pipe}' instead" unless sub_assembly.tails.first == @tail_pipe

  # This is the same "fix" applied to our field name metadata after a
  # sequence of Everies in Aggregations.  It just so happens that all of
  # CountBy, SumBy, and AverageBy end with Everies.  However, it appears to
  # only be necessary for AverageBy (which has different declaredFields for
  # its partials than its final, unlike the other two).  It would be nice
  # to track this issue down so that we can remove this hack from here and
  # Aggregations#finalize.
  discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new)
  @scope = Scope.outgoing_scope(discard_each, [scope])

  [@tail_pipe, @scope]
end

Public Class Methods

new(assembly, sub_assembly) click to toggle source

Construct a SubAssembly within the given Assembly

# File lib/cascading/sub_assembly.rb, line 16
def initialize(assembly, sub_assembly)
  @assembly = assembly
  @sub_assembly = sub_assembly
  @tail_pipe = assembly.tail_pipe
  @scope = assembly.scope

  # Enforces 1 tail pipe assumption
  raise 'SubAssembly must call setTails in constructor' unless sub_assembly.tails
  raise 'SubAssembly must set exactly 1 tail in constructor' unless sub_assembly.tails.size == 1
end