Aggregations is the context available to you within the block of a group_by, union, or join that allows you to apply Every pipes to the result of those operations. You may apply aggregators and buffers within this context subject to several rules laid out by Cascading.
Rules enforced by Aggregations:
Contains either 1 Buffer or >= 1 Aggregator (explicitly checked)
No GroupBys, CoGroups, Joins, or Merges (methods for these pipes do not exist on Aggregations)
No Eaches (Aggregations#each does not exist)
Aggregations may not branch (Aggregations#branch does not exist)
Externally enforced rules:
May be empty (in which case, Aggregations is not instantiated)
Must follow a GroupBy or CoGroup (not a HashJoin or Merge)
Optimizations:
If the leading Group is a GroupBy and all subsequent Everies are Aggregators that have a corresponding AggregateBy, Aggregations can replace the GroupBy/Aggregator pipe with a single composite AggregateBy
Aggregator and buffer DSL standard optional parameter names:
c.p.Every argument selector
c.o.Operation field declaration
c.p.Every output selector
Builds an every assertion pipe given a c.o.a.Assertion and adds it to the current list of aggregations. Note this breaks a chain of AggregateBys.
The named options are:
The assertion level; defaults to strict.
# File lib/cascading/aggregations.rb, line 115 def assert_group(assertion, options = {}) assertion_level = options[:level] || Java::CascadingOperation::AssertionLevel::STRICT parameters = [tail_pipe, assertion_level, assertion] make_pipe(Java::CascadingPipe::Every, parameters) end
Builds a pipe that asserts the size of the current group is the specified size for all groups.
# File lib/cascading/aggregations.rb, line 124 def assert_group_size_equals(size, options = {}) assertion = Java::CascadingOperationAssertion::AssertGroupSizeEquals.new(size) assert_group(assertion, options) end
Averages the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
Examples:
assembly 'aggregate' do ... insert 'const' => 1 group_by 'const' do max 'field1', 'field2' max 'field3' => 'fieldA', 'field4' => 'fieldB' end discard 'const' end
# File lib/cascading/aggregations.rb, line 272 def average(*fields_or_field_map) field_map, _ = extract_field_map(fields_or_field_map) field_map.each do |in_field, out_field| average_aggregator = Java::CascadingOperationAggregator::Average.new(fields(out_field)) average_by = Java::CascadingPipeAssembly::AverageBy.new(fields(in_field), fields(out_field)) every(in_field, :aggregator => average_aggregator, :output => all_fields, :aggregate_by => average_by) end raise "average invoked on 0 fields" if field_map.empty? end
We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of #aggregate_bys being a non-empty array).
# File lib/cascading/aggregations.rb, line 58 def can_aggregate_by? !aggregate_bys.nil? && !aggregate_bys.empty? end
Counts elements of each group. May optionally specify the name of the output count field, which defaults to ‘count’.
Examples:
assembly 'aggregate' do
...
group_by 'key1', 'key2' do
count
count 'key1_key2_count'
end
end
# File lib/cascading/aggregations.rb, line 220 def count(name = 'count') count_aggregator = Java::CascadingOperationAggregator::Count.new(fields(name)) count_by = Java::CascadingPipeAssembly::CountBy.new(fields(name)) every(last_grouping_fields, :aggregator => count_aggregator, :output => all_fields, :aggregate_by => count_by) end
Prints information about the scope of these Aggregations at the point at which it is called. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.
# File lib/cascading/aggregations.rb, line 50 def debug_scope puts "Current scope of aggregations for '#{assembly.name}':\n #{scope}\n----------\n" end
Builds an every pipe and adds it to the current list of aggregations. Note that this list may be either exactly 1 Buffer or any number of Aggregators. Exactly one of :aggregator or :buffer must be specified and :aggregator may be accompanied by a corresponding :aggregate_by.
The named options are:
A Cascading Aggregator, mutually exclusive with :buffer.
A Cascading AggregateBy that corresponds to the given :aggregator. Only makes sense with the :aggregator option and does not exist for all Aggregators. Providing nothing or nil will cause all Aggregations to operate as normal, without being compiled into a composite AggregateBy.
A Cascading Buffer, mutually exclusive with :aggregator.
c.p.Every output selector.
Example:
every 'field1', 'field2', :aggregator => sum_aggregator, :aggregate_by => sum_by, :output => all_fields every fields(input_fields), :buffer => Java::SomePackage::SomeBuffer.new, :output => all_fields
# File lib/cascading/aggregations.rb, line 90 def every(*args_with_options) options, in_fields = args_with_options.extract_options!, fields(args_with_options) out_fields = fields(options[:output]) operation = options[:aggregator] || options[:buffer] raise 'every requires either :aggregator or :buffer' unless operation if options[:aggregate_by] && aggregate_bys aggregate_bys << options[:aggregate_by] else @aggregate_bys = nil end parameters = [tail_pipe, in_fields, operation, out_fields].compact every = make_pipe(Java::CascadingPipe::Every, parameters) raise ':aggregator specified but c.o.Buffer provided' if options[:aggregator] && every.is_buffer raise ':buffer specified but c.o.Aggregator provided' if options[:buffer] && every.is_aggregator every end
“Fix” out values fields after a sequence of Everies. This is a field name metadata fix which is why the Identity is not planned into the resulting Cascading pipe. Without it, all values fields would propagate through non-empty aggregations, which doesn’t match Cascading’s planner’s behavior.
# File lib/cascading/aggregations.rb, line 67 def finalize discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new) @scope = Scope.outgoing_scope(discard_each, [scope]) end
Returns the first value within each group for the specified fields. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
Java Array of Tuples which should be ignored
Examples:
assembly 'aggregate' do ... group_by 'key1', 'key2' do first 'field1', 'field2' first 'field3' => 'fieldA', 'field4' => 'fieldB' end end
# File lib/cascading/aggregations.rb, line 186 def first(*args_with_options) composite_aggregator(args_with_options, Java::CascadingOperationAggregator::First) end
Returns the last value within each group for the specified fields. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
Java Array of Tuples which should be ignored
Examples:
assembly 'aggregate' do ... group_by 'key1', 'key2' do last 'field1', 'field2' last 'field3' => 'fieldA', 'field4' => 'fieldB' end end
# File lib/cascading/aggregations.rb, line 205 def last(*args_with_options) composite_aggregator(args_with_options, Java::CascadingOperationAggregator::Last) end
Computes the maxima of the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
Java Array of Objects of values to be ignored.
Examples:
assembly 'aggregate' do ... insert 'const' => 1 group_by 'const' do max 'field1', 'field2' max 'field3' => 'fieldA', 'field4' => 'fieldB' end discard 'const' end
# File lib/cascading/aggregations.rb, line 167 def max(*args_with_options) composite_aggregator(args_with_options, Java::CascadingOperationAggregator::Max) end
Computes the minima of the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
Java Array of Objects of values to be ignored.
Examples:
assembly 'aggregate' do ... insert 'const' => 1 group_by 'const' do min 'field1', 'field2' min 'field3' => 'fieldA', 'field4' => 'fieldB' end discard 'const' end
# File lib/cascading/aggregations.rb, line 146 def min(*args_with_options) composite_aggregator(args_with_options, Java::CascadingOperationAggregator::Min) end
Sums the specified fields within each group. Fields may be a list or provided through the :mapping option for renaming. Note that fields are sorted by name when a map is provided.
The named options are:
Map of input to output field names if renaming is desired. Results in output fields sorted by input field.
Controls the type of the output, specified using values from the Cascading::JAVA_TYPE_MAP as in Janino expressions (:double, :long, etc.)
Examples:
assembly 'aggregate' do ... group_by 'key1', 'key2' do sum 'field1', 'field2', :type => :long sum :mapping => { 'field3' => 'fieldA', 'field4' => 'fieldB' }, :type => :double end end
# File lib/cascading/aggregations.rb, line 244 def sum(*args_with_options) options, in_fields = args_with_options.extract_options!, args_with_options type = JAVA_TYPE_MAP[options[:type]] mapping = options[:mapping] ? options[:mapping].sort : in_fields.zip(in_fields) mapping.each do |in_field, out_field| sum_aggregator = Java::CascadingOperationAggregator::Sum.new(*[fields(out_field), type].compact) # NOTE: SumBy requires a type in wip-286, unlike Sum (see Sum.java line 42 for default) sum_by = Java::CascadingPipeAssembly::SumBy.new(fields(in_field), fields(out_field), type || Java::double.java_class) every(in_field, :aggregator => sum_aggregator, :output => all_fields, :aggregate_by => sum_by) end raise "sum invoked on 0 fields (note :mapping must be provided to explicitly rename fields)" if mapping.empty? end
Do not use this constructor directly; instead, pass a block containing the desired aggregations to a group_by, union, or join and it will be instantiated for you.
Builds the context in which a sequence of Every aggregations may be evaluated in the given assembly appended to the given group pipe and with the given incoming_scopes.
# File lib/cascading/aggregations.rb, line 37 def initialize(assembly, group, incoming_scopes) @assembly = assembly @tail_pipe = group @scope = Scope.outgoing_scope(tail_pipe, incoming_scopes) # AggregateBy optimization only applies to GroupBy @aggregate_bys = tail_pipe.is_group_by ? [] : nil end