The Cascading module contains all of the cascading.jruby DSL. Inserting the following into your script:
require 'rubygems' require 'cascading'
includes this module at the top level, making all of its features available.
To build a dataflow like the one in the README.md or samples, start by looking at Cascade or Flow. These are the highest level structures you’ll use to put together your job.
Within a flow, you’ll connect sources to sinks by way of Assembly, which refers to “pipe assemblies” from Cascading. Within an Assembly, you’ll use functions and filters (see Operations, IdentityOperations, RegexOperations, FilterOperations, and TextOperations) as well as Cascading::Assembly#group_by, Cascading::Assembly#union, and Cascading::Assembly#join. You can provide those last pipes with a block that can select operations from Aggregations.
Finally, you’ll want to address the execution of your job, whether it be locally testing or running remotely on a Hadoop cluster. See the Mode class for the available modes, and parameterize your script such that it can operate in Cascading local mode locally and in Hadoop mode when run in a jar produced with Jading.
Mapping that defines a convenient syntax for specifying Java classes, used in Janino expressions and elsewhere.
Convenience method wrapping c.t.Fields::ALL
# File lib/cascading/cascading.rb, line 147 def all_fields Java::CascadingTuple::Fields::ALL end
Builds a top-level Cascade given a name and a block.
The named options are:
See Cascade#initialize
See Cascade#initialize
Example:
cascade 'wordcount', :mode => :local do flow 'first_step' do ... end flow 'second_step' do ... end end
# File lib/cascading/cascading.rb, line 70 def cascade(name, options = {}, &block) raise "Could not build cascade '#{name}'; block required" unless block_given? raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if options[:properties] options[:properties] = $jobconf_properties.dup if defined?($jobconf_properties) && $jobconf_properties cascade = Cascade.new(name, options) cascade.instance_eval(&block) cascade end
Helper used by #dedup_fields that operates on arrays of field names rather than fields objects.
Example:
left_names = ['a', 'b'] mid_names = ['a', 'c'] right_names = ['a', 'd'] deduped_names = dedup_field_names(left_names, mid_names, right_names) # deduped_names == ['a', 'b', 'a_', 'c', 'a__', 'd']
# File lib/cascading/cascading.rb, line 185 def dedup_field_names(*names) names.inject([]) do |acc, arr| acc + arr.map{ |e| search_field_name(acc, e) } end end
Combines fields deduplicating them with trailing underscores as necessary. This is used in joins to avoid requiring the caller to unique fields before they are joined.
# File lib/cascading/cascading.rb, line 171 def dedup_fields(*fields) raise 'Can only be applied to declarators' unless fields.all?{ |f| f.is_declarator? } fields(dedup_field_names(*fields.map{ |f| f.to_a })) end
Produces a textual description of all Cascades in the global registry. The description details the structure of the Cascades, the sources and sinks of each Flow, and the input and output fields of each Assembly.
NOTE: will be moved to Job in later version
# File lib/cascading/cascading.rb, line 114 def describe Cascade.all.map{ |cascade| cascade.describe }.join("\n") end
Computes fields formed by removing remove_fields from base_fields. Operates only on named fields, not positional fields.
Example:
base_fields = fields(['a', 'b', 'c']) remove_fields = fields(['b']) result_fields = difference_fields(base_fields, remove_fields) # results_fields.to_a == ['a', 'c']
# File lib/cascading/cascading.rb, line 164 def difference_fields(base_fields, remove_fields) fields(base_fields.to_a - remove_fields.to_a) end
# File lib/cascading/cascading.rb, line 120 def expr(expression, options = {}) ExprStub.expr(expression, options) end
Utility method for creating Cascading c.t.Fields from a field name (string) or list of field names (array of strings). If the input fields is already a c.t.Fields or nil, it is passed through. This allows for flexible use of the method at multiple layers in the DSL.
Example:
cascading_fields = fields(['first', 'second', 'third']) # cascading_fields.to_a == ['first', 'second', 'third']
# File lib/cascading/cascading.rb, line 132 def fields(fields) if fields.nil? return nil elsif fields.is_a? Java::CascadingTuple::Fields return fields elsif fields.is_a? ::Array if fields.size == 1 return fields(fields[0]) end raise "Fields cannot be nil: #{fields.inspect}" if fields.include?(nil) end return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable)) end
Builds a top-level Flow given a name and block for applications built of flows with no cascades.
The named options are:
See Flow#initialize
See Flow#initialize
Example:
flow 'wordcount', :mode => :local do assembly 'first_step' do ... end assembly 'second_step' do ... end end
# File lib/cascading/cascading.rb, line 98 def flow(name, options = {}, &block) raise "Could not build flow '#{name}'; block required" unless block_given? raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if options[:properties] options[:properties] = $jobconf_properties.dup if defined?($jobconf_properties) && $jobconf_properties flow = Flow.new(name, nil, options) flow.instance_eval(&block) flow end
Convenience method wrapping c.t.Fields::VALUES
# File lib/cascading/cascading.rb, line 152 def last_grouping_fields Java::CascadingTuple::Fields::VALUES end
Constructs properties to be passed to Cascading::Flow#complete or Cascading::Cascade#complete which will locate temporary Hadoop files in base_dir. It is necessary to pass these properties only when executing scripts in Hadoop local mode via JRuby’s main method, which confuses Cascading’s attempt to find the containing jar. When using Cascading local mode, these are unnecessary.
# File lib/cascading/cascading.rb, line 264 def local_properties(base_dir) dirs = { 'test.build.data' => "#{base_dir}/build", 'hadoop.tmp.dir' => "#{base_dir}/tmp", 'hadoop.log.dir' => "#{base_dir}/log", } dirs.each{ |key, dir| %xmkdir -p #{dir}` } job_conf = Java::OrgApacheHadoopMapred::JobConf.new job_conf.jar = dirs['test.build.data'] dirs.each{ |key, dir| job_conf.set(key, dir) } job_conf.num_map_tasks = 1 job_conf.num_reduce_tasks = 1 properties = java.util.HashMap.new Java::CascadingFlowHadoopPlanner::HadoopPlanner.copy_job_conf(properties, job_conf) properties end
Convenience access to Cascading::MultiTap.multi_sink_tap. This constructor is more “DSL-like” because it allows you to pass taps directly as actual args rather than in an array:
multi_sink_tap tap1, tap2, tap3, ..., tapn
See Cascading::MultiTap.multi_sink_tap for more details.
# File lib/cascading/cascading.rb, line 249 def multi_sink_tap(*taps) MultiTap.multi_sink_tap(taps) end
Convenience access to Cascading::MultiTap.multi_source_tap. This constructor is more “DSL-like” because it allows you to pass taps directly as actual args rather than in an array:
multi_source_tap tap1, tap2, tap3, ..., tapn
See Cascading::MultiTap.multi_source_tap for more details.
# File lib/cascading/cascading.rb, line 239 def multi_source_tap(*taps) MultiTap.multi_source_tap(taps) end
Creates a c.s.h.SequenceFile scheme instance from the specified fields. A local SequenceFile scheme is not provided by Cascading, so this scheme cannot be used in Cascading local mode.
# File lib/cascading/cascading.rb, line 226 def sequence_file_scheme(*fields) { :local_scheme => nil, :hadoop_scheme => Java::CascadingSchemeHadoop::SequenceFile.new(fields.empty? ? all_fields : fields(fields)), } end
Creates a TextLine scheme (can be used in both Cascading local and hadoop modes). Positional args are used if :source_fields is not provided.
The named options are:
Fields to be read from a source with this scheme. Defaults to [‘offset’, ‘line’].
Fields to be written to a sink with this scheme. Defaults to all_fields.
A symbol, either :enable or :disable, that governs the TextLine scheme’s compression. Defaults to the default TextLine compression (only applies to c.s.h.TextLine).
# File lib/cascading/cascading.rb, line 207 def text_line_scheme(*args_with_options) options, source_fields = args_with_options.extract_options!, args_with_options source_fields = fields(options[:source_fields] || (source_fields.empty? ? ['offset', 'line'] : source_fields)) sink_fields = fields(options[:sink_fields]) || all_fields sink_compression = case options[:compression] when :enable then Java::CascadingSchemeHadoop::TextLine::Compress::ENABLE when :disable then Java::CascadingSchemeHadoop::TextLine::Compress::DISABLE else Java::CascadingSchemeHadoop::TextLine::Compress::DEFAULT end { :local_scheme => Java::CascadingSchemeLocal::TextLine.new(source_fields, sink_fields), :hadoop_scheme => Java::CascadingSchemeHadoop::TextLine.new(source_fields, sink_fields, sink_compression), } end