class Cascading::Flow

A Flow wraps a c.f.Flow. A Flow is composed of Assemblies, which are constructed using the #assembly method within the block passed to the Cascading::flow or Cascading::Cascade#flow constructor. Many Assemblies may be nested within a Flow.

Attributes

incoming_scopes[RW]
listeners[RW]
mode[R]
outgoing_scopes[RW]
properties[R]
sinks[RW]
sources[RW]

Public Instance Methods

add_archive_to_distributed_cache(file) click to toggle source

Adds the given path to the mapred.cache.archives list property.

# File lib/cascading/flow.rb, line 161
def add_archive_to_distributed_cache(file)
  add_to_distributed_cache(file, "mapred.cache.archives")
end
add_file_to_distributed_cache(file) click to toggle source

Adds the given path to the mapred.cache.files list property.

# File lib/cascading/flow.rb, line 156
def add_file_to_distributed_cache(file)
  add_to_distributed_cache(file, "mapred.cache.files")
end
add_listener(listener) click to toggle source

Appends a FlowListener to the list of listeners for this flow.

# File lib/cascading/flow.rb, line 166
def add_listener(listener)
  @listeners << listener
end
assembly(name, &block) click to toggle source

Builds a child Assembly in this Flow given a name and block.

An assembly’s name is quite important as it will determine:

  • The sources from which it will read, if any

  • The name to be used in joins or unions downstream

  • The name to be used to sink the output of the assembly downstream

Many assemblies may be built within a flow. The Cascading::Assembly#branch method is used for creating nested assemblies and produces objects of the same type as this constructor.

Example:

flow 'wordcount', :mode => :local do
  assembly 'first_step' do
    ...
  end

  assembly 'second_step' do
    ...
  end
end
# File lib/cascading/flow.rb, line 57
def assembly(name, &block)
  raise "Could not build assembly '#{name}'; block required" unless block_given?
  assembly = Assembly.new(name, self, @outgoing_scopes)
  add_child(assembly)
  assembly.instance_eval(&block)
  assembly
end
complete() click to toggle source

Completes this Flow after connecting it. This results in execution of the c.f.Flow built from this Flow. Use this method when executing a top-level Flow.

# File lib/cascading/flow.rb, line 205
def complete
  begin
    flow = connect
    @listeners.each { |l| flow.addListener(l) }
    flow.complete
  rescue NativeException => e
    raise CascadingException.new(e, 'Error completing flow')
  end
end
compress_output(codec, type) click to toggle source

Property modifier that sets the codec and type of the compression for all sinks in this flow. Currently only supports o.a.h.i.c.DefaultCodec and o.a.h.i.c.GzipCodec, and the the NONE, RECORD, or BLOCK compressions types defined in o.a.h.i.SequenceFile.

codec may be symbols like :default or :gzip and type may be symbols like :none, :record, or :block.

Example:

compress_output :default, :block
# File lib/cascading/flow.rb, line 134
def compress_output(codec, type)
  properties['mapred.output.compress'] = 'true'
  properties['mapred.output.compression.codec'] = case codec
    when :default then Java::OrgApacheHadoopIoCompress::DefaultCodec.java_class.name
    when :gzip then Java::OrgApacheHadoopIoCompress::GzipCodec.java_class.name
    else raise "Codec #{codec} not yet supported by cascading.jruby"
    end
  properties['mapred.output.compression.type'] = case type
    when :none   then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::NONE.to_s
    when :record then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::RECORD.to_s
    when :block  then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::BLOCK.to_s
    else raise "Compression type '#{type}' not supported"
    end
end
connect() click to toggle source

Connects this Flow, producing a c.f.Flow without completing it (the Flow is not executed). This method is used by Cascade to connect its child Flows. To connect and complete a Flow, see #complete.

# File lib/cascading/flow.rb, line 185
def connect
  puts "Connecting flow '#{name}' with properties:"
  properties.keys.sort.each do |key|
    puts "#{key}=#{properties[key]}"
  end

  # FIXME: why do I have to do this in 2.0 wip-255?
  Java::CascadingProperty::AppProps.setApplicationName(properties, name)
  Java::CascadingProperty::AppProps.setApplicationVersion(properties, '0.0.0')
  Java::CascadingProperty::AppProps.setApplicationJarClass(properties, Java::CascadingFlow::Flow.java_class)

  sources = make_tap_parameter(@sources, :head_pipe)
  sinks = make_tap_parameter(@sinks, :tail_pipe)
  pipes = make_pipes
  mode.connect_flow(properties, name, sources, sinks, pipes)
end
debug_scope(name = nil) click to toggle source

Prints information about the scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.

# File lib/cascading/flow.rb, line 106
def debug_scope(name = nil)
  scope = scope(name)
  name ||= last_child.name
  puts "Scope for '#{name}':\n  #{scope}"
end
describe(offset = '') click to toggle source

Produces a textual description of this Flow. The description details the structure of the Flow, its sources and sinks, and the input and output fields of each Assembly. The offset parameter allows for this describe to be nested within a calling context, which lets us indent the structural hierarchy of a job.

# File lib/cascading/flow.rb, line 84
def describe(offset = '')
  description =  "#{offset}#{name}:flow\n"
  description += "#{sources.keys.map{ |source| "#{offset}  #{source}:source :: #{incoming_scopes[source].values_fields.to_a.inspect}" }.join("\n")}\n"
  description += "#{child_names.map{ |child| children[child].describe("#{offset}  ") }.join("\n")}\n"
  description += "#{sinks.keys.map{ |sink| "#{offset}  #{sink}:sink :: #{outgoing_scopes[sink].values_fields.to_a.inspect}" }.join("\n")}"
  description
end
emr_local_path_for_distributed_cache_file(file) click to toggle source

Handles locating a file cached from S3 on local disk. TODO: remove

# File lib/cascading/flow.rb, line 171
def emr_local_path_for_distributed_cache_file(file)
  # NOTE this needs to be *appended* to the property mapred.local.dir
  if file =~ /^s3n?:\/\//
    # EMR
    "/taskTracker/archive/#{file.gsub(/^s3n?:\/\//, "")}"
  else
    # Local
    file
  end
end
scope(name = nil) click to toggle source

Accesses the outgoing scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This is useful for grabbing the values_fields at any point in the construction of the Flow. See Scope for details.

# File lib/cascading/flow.rb, line 96
def scope(name = nil)
  raise 'Must specify name if no children have been defined yet' unless name || last_child
  name ||= last_child.name
  @outgoing_scopes[name]
end
set_spill_threshold(threshold) click to toggle source

Set the cascading.spill.list.threshold property in this flow’s properties. See c.t.c.SpillableProps for details.

# File lib/cascading/flow.rb, line 151
def set_spill_threshold(threshold)
  properties['cascading.spill.list.threshold'] = threshold.to_s
end
sink(name, tap) click to toggle source

Create a new sink for this flow, using the specified name and Cascading::Tap

# File lib/cascading/flow.rb, line 75
def sink(name, tap)
  sinks[name] = tap
end
sink_metadata() click to toggle source

Builds a map, keyed by sink name, of the sink metadata for each sink. Currently, this contains only the field names of each sink.

# File lib/cascading/flow.rb, line 114
def sink_metadata
  @sinks.keys.inject({}) do |sink_metadata, sink_name|
    raise "Cannot sink undefined assembly '#{sink_name}'" unless @outgoing_scopes[sink_name]
    sink_metadata[sink_name] = {
      :field_names => @outgoing_scopes[sink_name].values_fields.to_a,
    }
    sink_metadata
  end
end
source(name, tap) click to toggle source

Create a new source for this flow, using the specified name and Cascading::Tap

# File lib/cascading/flow.rb, line 67
def source(name, tap)
  sources[name] = tap
  incoming_scopes[name] = Scope.source_scope(name, mode.source_tap(name, tap), @flow_scope)
  outgoing_scopes[name] = incoming_scopes[name]
end

Public Class Methods

new(name, parent, options = {}) click to toggle source

Do not use this constructor directly. Instead, use Cascading::flow to build top-level flows and Cascading::Cascade#flow to build flows within a Cascade.

Builds a Flow given a name and a parent node (a Cascade or nil).

The named options are:

properties

Properties hash which allows external configuration of this flow. The flow will side-effect the properties during composition, then pass the modified properties along to the FlowConnector for execution. See Cascade#initialize for details on how properties are propagated through cascades.

mode

Mode which will determine the execution mode of this flow. See Cascading::Mode.parse for details.

# File lib/cascading/flow.rb, line 27
def initialize(name, parent, options = {})
  @sources, @sinks, @incoming_scopes, @outgoing_scopes, @listeners = {}, {}, {}, {}, []
  @properties = options[:properties] || {}
  @mode = Mode.parse(options[:mode])
  @flow_scope = Scope.flow_scope(name)
  super(name, parent)
  self.class.add(name, self)
end