A Flow wraps a c.f.Flow. A Flow is composed of Assemblies, which are constructed using the #assembly method within the block passed to the Cascading::flow or Cascading::Cascade#flow constructor. Many Assemblies may be nested within a Flow.
Adds the given path to the mapred.cache.archives list property.
# File lib/cascading/flow.rb, line 161 def add_archive_to_distributed_cache(file) add_to_distributed_cache(file, "mapred.cache.archives") end
Adds the given path to the mapred.cache.files list property.
# File lib/cascading/flow.rb, line 156 def add_file_to_distributed_cache(file) add_to_distributed_cache(file, "mapred.cache.files") end
Appends a FlowListener to the list of listeners for this flow.
# File lib/cascading/flow.rb, line 166 def add_listener(listener) @listeners << listener end
Builds a child Assembly in this Flow given a name and block.
An assembly’s name is quite important as it will determine:
The sources from which it will read, if any
The name to be used in joins or unions downstream
The name to be used to sink the output of the assembly downstream
Many assemblies may be built within a flow. The Cascading::Assembly#branch method is used for creating nested assemblies and produces objects of the same type as this constructor.
Example:
flow 'wordcount', :mode => :local do assembly 'first_step' do ... end assembly 'second_step' do ... end end
# File lib/cascading/flow.rb, line 57 def assembly(name, &block) raise "Could not build assembly '#{name}'; block required" unless block_given? assembly = Assembly.new(name, self, @outgoing_scopes) add_child(assembly) assembly.instance_eval(&block) assembly end
Completes this Flow after connecting it. This results in execution of the c.f.Flow built from this Flow. Use this method when executing a top-level Flow.
# File lib/cascading/flow.rb, line 205 def complete begin flow = connect @listeners.each { |l| flow.addListener(l) } flow.complete rescue NativeException => e raise CascadingException.new(e, 'Error completing flow') end end
Property modifier that sets the codec and type of the compression for all sinks in this flow. Currently only supports o.a.h.i.c.DefaultCodec and o.a.h.i.c.GzipCodec, and the the NONE, RECORD, or BLOCK compressions types defined in o.a.h.i.SequenceFile.
codec may be symbols like :default or :gzip and type may be symbols like :none, :record, or :block.
Example:
compress_output :default, :block
# File lib/cascading/flow.rb, line 134 def compress_output(codec, type) properties['mapred.output.compress'] = 'true' properties['mapred.output.compression.codec'] = case codec when :default then Java::OrgApacheHadoopIoCompress::DefaultCodec.java_class.name when :gzip then Java::OrgApacheHadoopIoCompress::GzipCodec.java_class.name else raise "Codec #{codec} not yet supported by cascading.jruby" end properties['mapred.output.compression.type'] = case type when :none then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::NONE.to_s when :record then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::RECORD.to_s when :block then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::BLOCK.to_s else raise "Compression type '#{type}' not supported" end end
Connects this Flow, producing a c.f.Flow without completing it (the Flow is not executed). This method is used by Cascade to connect its child Flows. To connect and complete a Flow, see #complete.
# File lib/cascading/flow.rb, line 185 def connect puts "Connecting flow '#{name}' with properties:" properties.keys.sort.each do |key| puts "#{key}=#{properties[key]}" end # FIXME: why do I have to do this in 2.0 wip-255? Java::CascadingProperty::AppProps.setApplicationName(properties, name) Java::CascadingProperty::AppProps.setApplicationVersion(properties, '0.0.0') Java::CascadingProperty::AppProps.setApplicationJarClass(properties, Java::CascadingFlow::Flow.java_class) sources = make_tap_parameter(@sources, :head_pipe) sinks = make_tap_parameter(@sinks, :tail_pipe) pipes = make_pipes mode.connect_flow(properties, name, sources, sinks, pipes) end
Prints information about the scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.
# File lib/cascading/flow.rb, line 106 def debug_scope(name = nil) scope = scope(name) name ||= last_child.name puts "Scope for '#{name}':\n #{scope}" end
Produces a textual description of this Flow. The description details the structure of the Flow, its sources and sinks, and the input and output fields of each Assembly. The offset parameter allows for this describe to be nested within a calling context, which lets us indent the structural hierarchy of a job.
# File lib/cascading/flow.rb, line 84 def describe(offset = '') description = "#{offset}#{name}:flow\n" description += "#{sources.keys.map{ |source| "#{offset} #{source}:source :: #{incoming_scopes[source].values_fields.to_a.inspect}" }.join("\n")}\n" description += "#{child_names.map{ |child| children[child].describe("#{offset} ") }.join("\n")}\n" description += "#{sinks.keys.map{ |sink| "#{offset} #{sink}:sink :: #{outgoing_scopes[sink].values_fields.to_a.inspect}" }.join("\n")}" description end
Handles locating a file cached from S3 on local disk. TODO: remove
# File lib/cascading/flow.rb, line 171 def emr_local_path_for_distributed_cache_file(file) # NOTE this needs to be *appended* to the property mapred.local.dir if file =~ /^s3n?:\/\// # EMR "/taskTracker/archive/#{file.gsub(/^s3n?:\/\//, "")}" else # Local file end end
Accesses the outgoing scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This is useful for grabbing the values_fields at any point in the construction of the Flow. See Scope for details.
# File lib/cascading/flow.rb, line 96 def scope(name = nil) raise 'Must specify name if no children have been defined yet' unless name || last_child name ||= last_child.name @outgoing_scopes[name] end
Set the cascading.spill.list.threshold property in this flow’s properties. See c.t.c.SpillableProps for details.
# File lib/cascading/flow.rb, line 151 def set_spill_threshold(threshold) properties['cascading.spill.list.threshold'] = threshold.to_s end
Create a new sink for this flow, using the specified name and Cascading::Tap
# File lib/cascading/flow.rb, line 75 def sink(name, tap) sinks[name] = tap end
Builds a map, keyed by sink name, of the sink metadata for each sink. Currently, this contains only the field names of each sink.
# File lib/cascading/flow.rb, line 114 def sink_metadata @sinks.keys.inject({}) do |sink_metadata, sink_name| raise "Cannot sink undefined assembly '#{sink_name}'" unless @outgoing_scopes[sink_name] sink_metadata[sink_name] = { :field_names => @outgoing_scopes[sink_name].values_fields.to_a, } sink_metadata end end
Create a new source for this flow, using the specified name and Cascading::Tap
# File lib/cascading/flow.rb, line 67 def source(name, tap) sources[name] = tap incoming_scopes[name] = Scope.source_scope(name, mode.source_tap(name, tap), @flow_scope) outgoing_scopes[name] = incoming_scopes[name] end
Do not use this constructor directly. Instead, use Cascading::flow to build top-level flows and Cascading::Cascade#flow to build flows within a Cascade.
Builds a Flow given a name and a parent node (a Cascade or nil).
The named options are:
Properties hash which allows external configuration of this flow. The flow will side-effect the properties during composition, then pass the modified properties along to the FlowConnector for execution. See Cascade#initialize for details on how properties are propagated through cascades.
Mode which will determine the execution mode of this flow. See Cascading::Mode.parse for details.
# File lib/cascading/flow.rb, line 27 def initialize(name, parent, options = {}) @sources, @sinks, @incoming_scopes, @outgoing_scopes, @listeners = {}, {}, {}, {}, [] @properties = options[:properties] || {} @mode = Mode.parse(options[:mode]) @flow_scope = Scope.flow_scope(name) super(name, parent) self.class.add(name, self) end