diff --git a/Cargo.lock b/Cargo.lock index 1c3e07b47b28d..4bb1c2529f75b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3510,7 +3510,7 @@ dependencies = [ [[package]] name = "timely" version = "0.11.1" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#b42542e439327d2504dd2e4a05604a1753a5f191" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#e37923d78133501006310bae022cec7ebe41dffe" dependencies = [ "abomonation", "abomonation_derive", @@ -3524,12 +3524,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.11.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#b42542e439327d2504dd2e4a05604a1753a5f191" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#e37923d78133501006310bae022cec7ebe41dffe" [[package]] name = "timely_communication" version = "0.11.1" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#b42542e439327d2504dd2e4a05604a1753a5f191" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#e37923d78133501006310bae022cec7ebe41dffe" dependencies = [ "abomonation", "abomonation_derive", @@ -3544,7 +3544,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.11.1" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#b42542e439327d2504dd2e4a05604a1753a5f191" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#e37923d78133501006310bae022cec7ebe41dffe" [[package]] name = "timely_sort" diff --git a/src/dataflow/render/context.rs b/src/dataflow/render/context.rs index 5140d748812ab..58c47538eee32 100644 --- a/src/dataflow/render/context.rs +++ b/src/dataflow/render/context.rs @@ -138,6 +138,45 @@ where } } + /// Applies logic to elements of a collection and returns the results. + /// + /// This method allows savvy users to avoid the potential allocation + /// required by `self.collection(expr)` when converting data from an + /// arrangement; if less data are required, the `logic` argument is able + /// to use a reference and produce the minimal amount of data instead. + pub fn flat_map_ref( + &self, + relation_expr: &P, + mut logic: L, + ) -> Option<( + Collection, + Collection, + )> + where + I: IntoIterator, + I::Item: Data, + L: FnMut(&V) -> I + 'static, + { + if let Some((oks, err)) = self.collections.get(relation_expr) { + Some((oks.flat_map(move |v| logic(&v)), err.clone())) + } else if let Some(local) = self.local.get(relation_expr) { + let (oks, errs) = local.values().next().expect("Empty arrangement"); + Some(( + oks.flat_map_ref(move |_k, v| logic(v)), + errs.as_collection(|k, _v| k.clone()), + )) + } else if let Some(trace) = self.trace.get(relation_expr) { + let (_id, oks, errs) = trace.values().next().expect("Empty arrangement"); + Some(( + // oks.as_collection(|_k, v| v.clone()), + oks.flat_map_ref(move |_k, v| logic(v)), + errs.as_collection(|k, _v| k.clone()), + )) + } else { + None + } + } + /// Convenience method for accessing `arrangement` when all keys are plain columns pub fn arrangement_columns( &self, diff --git a/src/dataflow/render/reduce.rs b/src/dataflow/render/reduce.rs index 621a7273d0aba..c000d03175e11 100644 --- a/src/dataflow/render/reduce.rs +++ b/src/dataflow/render/reduce.rs @@ -20,11 +20,10 @@ use timely::dataflow::Scope; use timely::progress::{timestamp::Refines, Timestamp}; use dataflow_types::DataflowError; -use expr::{AggregateExpr, AggregateFunc, RelationExpr, ScalarExpr}; +use expr::{AggregateExpr, AggregateFunc, RelationExpr}; use repr::{Datum, Row, RowArena, RowPacker}; use super::context::Context; -use crate::operator::{CollectionExt, StreamExt}; use crate::render::context::Arrangement; impl Context @@ -61,43 +60,109 @@ where // applied a single reduction (which should be good for any consumers // of the operator and its arrangement). - let keys_clone = group_key.clone(); let relation_expr_clone = relation_expr.clone(); - let (ok_input, err_input) = self.collection(input).unwrap(); + // Our first step is to extract `(key, vals)` from `input`. + // We do this carefully, attempting to avoid unneccesary allocations + // that would result from cloning rows in input arrangements. + let group_key_clone = group_key.clone(); + let aggregates_clone = aggregates.clone(); + + // Tracks the required number of columns to extract. + let mut columns_needed = 0; + for key in group_key.iter() { + for column in key.support() { + columns_needed = std::cmp::max(columns_needed, column + 1); + } + } + for aggr in aggregates.iter() { + for column in aggr.expr.support() { + columns_needed = std::cmp::max(columns_needed, column + 1); + } + } + + let mut row_packer = RowPacker::new(); + let (key_val_input, mut err_input): ( + Collection<_, Result<(Row, Row), DataflowError>, _>, + _, + ) = self + .flat_map_ref(input, move |row| { + let temp_storage = RowArena::new(); + let mut results = Vec::new(); + // First, evaluate the key selector expressions. + // If any error we produce their errors as output and note + // the fact that the key was not correctly produced. + // TODO(frank): this allocation could be re-used if we were + // more clever about where it came from (e.g. allocated with + // lifetime `'storage` in `flat_map_ref`). + let datums = row.iter().take(columns_needed).collect::>(); + for expr in group_key_clone.iter() { + match expr.eval(&datums, &temp_storage) { + Ok(val) => row_packer.push(val), + Err(e) => { + results.push(Err(e.into())); + } + } + } + + // Second, evaluate the value selector. + // If any error occurs we produce both the error as output, + // but also a `Datum::Null` value to avoid causing the later + // "ReduceCollation" operator to panic due to absent aggregates. + if results.is_empty() { + let key = row_packer.finish_and_reuse(); + for aggr in aggregates_clone.iter() { + match aggr.expr.eval(&datums, &temp_storage) { + Ok(val) => { + row_packer.push(val); + } + Err(e) => { + row_packer.push(Datum::Null); + results.push(Err(e.into())); + } + } + } + let row = row_packer.finish_and_reuse(); + results.push(Ok((key, row))); + } + // Return accumulated results. + results + }) + .unwrap(); + + // Demux out the potential errors from key and value selector evaluation. + use timely::dataflow::operators::ok_err::OkErr; + let (ok, err) = key_val_input.inner.ok_err(|(x, t, d)| match x { + Ok(x) => Ok((x, t, d)), + Err(x) => Err((x, t, d)), + }); + + let ok_input = ok.as_collection(); + err_input = err.as_collection().concat(&err_input); // Distinct is a special case, as there are no aggregates to aggregate. // In this case, we use a special implementation that does not rely on // collating aggregates. let (oks, errs) = if aggregates.is_empty() { - let (ok_collection, err_collection) = ok_input.map_fallible({ - let group_key = group_key.clone(); - move |row| { - let temp_storage = RowArena::new(); - let datums = row.unpack(); - let key = Row::try_pack( - group_key.iter().map(|i| i.eval(&datums, &temp_storage)), - )?; - Ok::<_, DataflowError>((key, ())) - } - }); ( - ok_collection.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("DistinctBy", { + ok_input.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("DistinctBy", { |key, _input, output| { output.push((key.clone(), 1)); } }), - err_input.concat(&err_collection), + err_input, ) } else if aggregates.len() == 1 { // If we have a single aggregate, we need not stage aggregations separately. - build_aggregate_stage(ok_input, err_input, group_key, &aggregates[0], true) + ( + build_aggregate_stage(ok_input, 0, &aggregates[0], true), + err_input, + ) } else { // We'll accumulate partial aggregates here, where each contains updates // of the form `(key, (index, value))`. This is eventually concatenated, // and fed into a final reduce to put the elements in order. let mut ok_partials = Vec::with_capacity(aggregates.len()); - let mut err_partials = Vec::with_capacity(aggregates.len()); // Bound the complex dataflow in a region, for better interpretability. scope.region(|region| { // Create an iterator over collections, where each is the application @@ -105,19 +170,13 @@ where // position in the final results. To be followed by a merge reduction. for (index, aggr) in aggregates.iter().enumerate() { // Collect the now-aggregated partial result, annotated with its position. - let (ok_partial, err_partial) = build_aggregate_stage( - ok_input.enter(region), - err_input.enter(region), - group_key, - aggr, - false, - ); + let ok_partial = + build_aggregate_stage(ok_input.enter(region), index, aggr, false); ok_partials.push( ok_partial .as_collection(move |key, val| (key.clone(), (index, val.clone()))) .leave(), ); - err_partials.push(err_partial.leave()); } }); @@ -155,15 +214,14 @@ where result.extend(key.iter()); for ((_pos, val), cnt) in input.iter() { assert_eq!(*cnt, 1); - result.push(val.unpack().pop().unwrap()); + result.push(val.unpack_first()); } output.push((result.finish(), 1)); } }); - let errs = differential_dataflow::collection::concatenate(scope, err_partials); - (oks, errs) + (oks, err_input) }; - let index = (0..keys_clone.len()).collect::>(); + let index = (0..group_key.len()).collect::>(); self.set_local_columns(relation_expr, &index[..], (oks, errs.arrange())); } } @@ -174,81 +232,31 @@ where /// This method accommodates in-place aggregations like sums, hierarchical aggregations like min and max, /// and other aggregations that may be neither of those things. It also applies distinctness if required. fn build_aggregate_stage( - ok_input: Collection, - err_input: Collection, - group_key: &[ScalarExpr], + ok_input: Collection, + index: usize, aggr: &AggregateExpr, prepend_key: bool, -) -> (Arrangement, Collection) +) -> Arrangement where G: Scope, G::Timestamp: Lattice, { let AggregateExpr { func, - expr, + expr: _, distinct, } = aggr.clone(); - // It is important that in the case of an error in the value selector we still provide a - // value, so that the aggregation produces an aggregate with the correct key. If we do not, - // the `ReduceCollation` operator panics. - use timely::dataflow::channels::pact::Pipeline; - let (partial, err_partial) = + let mut partial = if !prepend_key { + let mut packer = RowPacker::new(); + ok_input.map(move |(key, row)| { + let value = row.iter().nth(index).unwrap(); + packer.push(value); + (key, packer.finish_and_reuse()) + }) + } else { ok_input - .inner - .unary_fallible(Pipeline, "ReduceStagePreparation", |_cap, _info| { - let group_key = group_key.to_vec(); - let mut storage = Vec::new(); - move |input, ok_output, err_output| { - input.for_each(|time, data| { - let temp_storage = RowArena::new(); - let mut ok_session = ok_output.session(&time); - let mut err_session = err_output.session(&time); - data.swap(&mut storage); - for (row, t, diff) in storage.drain(..) { - // First, evaluate the key selector expressions. - // If any error we produce their errors as output and note - // the fact that the key was not correctly produced. - let mut key_packer = RowPacker::new(); - let mut error_free = true; - let datums = row.unpack(); - for expr in group_key.iter() { - match expr.eval(&datums, &temp_storage) { - Ok(val) => key_packer.push(val), - Err(e) => { - err_session.give((e.into(), t.clone(), diff)); - error_free = false; - } - } - } - // Second, evaluate the value selector. - // If any error occurs we produce both the error as output, - // but also a `Datum::Null` value to avoid causing the later - // "ReduceCollation" operator to panic due to absent aggregates. - if error_free { - let key = key_packer.finish(); - match expr.eval(&datums, &temp_storage) { - Ok(val) => { - ok_session.give(((key, Row::pack(Some(val))), t, diff)); - } - Err(e) => { - ok_session.give(( - (key, Row::pack(Some(Datum::Null))), - t.clone(), - diff, - )); - err_session.give((e.into(), t, diff)); - } - } - } - } - }) - } - }); - - let mut partial = partial.as_collection(); - let err_partial = err_partial.as_collection(); + }; // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`. if distinct { @@ -301,7 +309,7 @@ where }) }; - (ok_out, err_input.concat(&err_partial)) + ok_out } /// Builds the dataflow for a reduction that can be performed in-place. @@ -319,6 +327,7 @@ where G: Scope, G::Timestamp: Lattice, { + use differential_dataflow::operators::consolidate::ConsolidateStream; use timely::dataflow::operators::map::Map; let float_scale = f64::from(1 << 24); @@ -330,7 +339,7 @@ where .explode({ let aggr = aggr.clone(); move |(key, row)| { - let datum = row.unpack()[0]; + let datum = row.unpack_first(); let (aggs, nonnulls) = match aggr { AggregateFunc::CountAll => { // Nothing beyond the accumulated count is needed. @@ -373,6 +382,7 @@ where )) } }) + .consolidate_stream() .reduce_abelian::<_, OrdValSpine<_, _, _, _>>( "ReduceAccumulable", move |key, input, output| {