-
Notifications
You must be signed in to change notification settings - Fork 482
Reorganize reduce to minimize allocations #3130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b2fecc6
61ae078
58d3666
899fb96
4eb0b3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,6 +138,45 @@ where | |
} | ||
} | ||
|
||
/// Applies logic to elements of a collection and returns the results. | ||
/// | ||
/// This method allows savvy users to avoid the potential allocation | ||
/// required by `self.collection(expr)` when converting data from an | ||
/// arrangement; if less data are required, the `logic` argument is able | ||
/// to use a reference and produce the minimal amount of data instead. | ||
pub fn flat_map_ref<I, L>( | ||
&self, | ||
relation_expr: &P, | ||
mut logic: L, | ||
) -> Option<( | ||
Collection<S, I::Item, Diff>, | ||
Collection<S, DataflowError, Diff>, | ||
)> | ||
where | ||
I: IntoIterator, | ||
I::Item: Data, | ||
L: FnMut(&V) -> I + 'static, | ||
{ | ||
if let Some((oks, err)) = self.collections.get(relation_expr) { | ||
Some((oks.flat_map(move |v| logic(&v)), err.clone())) | ||
} else if let Some(local) = self.local.get(relation_expr) { | ||
let (oks, errs) = local.values().next().expect("Empty arrangement"); | ||
Some(( | ||
oks.flat_map_ref(move |_k, v| logic(v)), | ||
errs.as_collection(|k, _v| k.clone()), | ||
)) | ||
} else if let Some(trace) = self.trace.get(relation_expr) { | ||
let (_id, oks, errs) = trace.values().next().expect("Empty arrangement"); | ||
Some(( | ||
// oks.as_collection(|_k, v| v.clone()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stray comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure is! Thanks. :D |
||
oks.flat_map_ref(move |_k, v| logic(v)), | ||
errs.as_collection(|k, _v| k.clone()), | ||
)) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
/// Convenience method for accessing `arrangement` when all keys are plain columns | ||
pub fn arrangement_columns( | ||
&self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,10 @@ use timely::dataflow::Scope; | |
use timely::progress::{timestamp::Refines, Timestamp}; | ||
|
||
use dataflow_types::DataflowError; | ||
use expr::{AggregateExpr, AggregateFunc, RelationExpr, ScalarExpr}; | ||
use expr::{AggregateExpr, AggregateFunc, RelationExpr}; | ||
use repr::{Datum, Row, RowArena, RowPacker}; | ||
|
||
use super::context::Context; | ||
use crate::operator::{CollectionExt, StreamExt}; | ||
use crate::render::context::Arrangement; | ||
|
||
impl<G, T> Context<G, RelationExpr, Row, T> | ||
|
@@ -61,63 +60,123 @@ where | |
// applied a single reduction (which should be good for any consumers | ||
// of the operator and its arrangement). | ||
|
||
let keys_clone = group_key.clone(); | ||
let relation_expr_clone = relation_expr.clone(); | ||
|
||
let (ok_input, err_input) = self.collection(input).unwrap(); | ||
// Our first step is to extract `(key, vals)` from `input`. | ||
// We do this carefully, attempting to avoid unneccesary allocations | ||
// that would result from cloning rows in input arrangements. | ||
let group_key_clone = group_key.clone(); | ||
let aggregates_clone = aggregates.clone(); | ||
|
||
// Tracks the required number of columns to extract. | ||
let mut columns_needed = 0; | ||
for key in group_key.iter() { | ||
for column in key.support() { | ||
columns_needed = std::cmp::max(columns_needed, column + 1); | ||
} | ||
} | ||
for aggr in aggregates.iter() { | ||
for column in aggr.expr.support() { | ||
columns_needed = std::cmp::max(columns_needed, column + 1); | ||
} | ||
} | ||
|
||
let mut row_packer = RowPacker::new(); | ||
let (key_val_input, mut err_input): ( | ||
Collection<_, Result<(Row, Row), DataflowError>, _>, | ||
_, | ||
) = self | ||
.flat_map_ref(input, move |row| { | ||
let temp_storage = RowArena::new(); | ||
let mut results = Vec::new(); | ||
// First, evaluate the key selector expressions. | ||
// If any error we produce their errors as output and note | ||
// the fact that the key was not correctly produced. | ||
// TODO(frank): this allocation could be re-used if we were | ||
// more clever about where it came from (e.g. allocated with | ||
// lifetime `'storage` in `flat_map_ref`). | ||
let datums = row.iter().take(columns_needed).collect::<Vec<_>>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we create the vector outside the closure (then clear/fill it from the iter inside) in order to reuse the allocation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot! At least not without a more advanced implementation. Roughly: the lifetime in there is that of I'll put a note that we could look in to it though! |
||
for expr in group_key_clone.iter() { | ||
match expr.eval(&datums, &temp_storage) { | ||
Ok(val) => row_packer.push(val), | ||
Err(e) => { | ||
results.push(Err(e.into())); | ||
} | ||
} | ||
} | ||
|
||
// Second, evaluate the value selector. | ||
// If any error occurs we produce both the error as output, | ||
// but also a `Datum::Null` value to avoid causing the later | ||
// "ReduceCollation" operator to panic due to absent aggregates. | ||
if results.is_empty() { | ||
let key = row_packer.finish_and_reuse(); | ||
for aggr in aggregates_clone.iter() { | ||
match aggr.expr.eval(&datums, &temp_storage) { | ||
Ok(val) => { | ||
row_packer.push(val); | ||
} | ||
Err(e) => { | ||
row_packer.push(Datum::Null); | ||
results.push(Err(e.into())); | ||
} | ||
} | ||
} | ||
let row = row_packer.finish_and_reuse(); | ||
results.push(Ok((key, row))); | ||
} | ||
// Return accumulated results. | ||
results | ||
}) | ||
.unwrap(); | ||
|
||
// Demux out the potential errors from key and value selector evaluation. | ||
use timely::dataflow::operators::ok_err::OkErr; | ||
let (ok, err) = key_val_input.inner.ok_err(|(x, t, d)| match x { | ||
Ok(x) => Ok((x, t, d)), | ||
Err(x) => Err((x, t, d)), | ||
}); | ||
|
||
let ok_input = ok.as_collection(); | ||
err_input = err.as_collection().concat(&err_input); | ||
|
||
// Distinct is a special case, as there are no aggregates to aggregate. | ||
// In this case, we use a special implementation that does not rely on | ||
// collating aggregates. | ||
let (oks, errs) = if aggregates.is_empty() { | ||
let (ok_collection, err_collection) = ok_input.map_fallible({ | ||
let group_key = group_key.clone(); | ||
move |row| { | ||
let temp_storage = RowArena::new(); | ||
let datums = row.unpack(); | ||
let key = Row::try_pack( | ||
group_key.iter().map(|i| i.eval(&datums, &temp_storage)), | ||
)?; | ||
Ok::<_, DataflowError>((key, ())) | ||
} | ||
}); | ||
( | ||
ok_collection.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("DistinctBy", { | ||
ok_input.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("DistinctBy", { | ||
|key, _input, output| { | ||
output.push((key.clone(), 1)); | ||
} | ||
}), | ||
err_input.concat(&err_collection), | ||
err_input, | ||
) | ||
} else if aggregates.len() == 1 { | ||
// If we have a single aggregate, we need not stage aggregations separately. | ||
build_aggregate_stage(ok_input, err_input, group_key, &aggregates[0], true) | ||
( | ||
build_aggregate_stage(ok_input, 0, &aggregates[0], true), | ||
err_input, | ||
) | ||
} else { | ||
// We'll accumulate partial aggregates here, where each contains updates | ||
// of the form `(key, (index, value))`. This is eventually concatenated, | ||
// and fed into a final reduce to put the elements in order. | ||
let mut ok_partials = Vec::with_capacity(aggregates.len()); | ||
let mut err_partials = Vec::with_capacity(aggregates.len()); | ||
// Bound the complex dataflow in a region, for better interpretability. | ||
scope.region(|region| { | ||
// Create an iterator over collections, where each is the application | ||
// of one aggregation function whose results are annotated with its | ||
// position in the final results. To be followed by a merge reduction. | ||
for (index, aggr) in aggregates.iter().enumerate() { | ||
// Collect the now-aggregated partial result, annotated with its position. | ||
let (ok_partial, err_partial) = build_aggregate_stage( | ||
ok_input.enter(region), | ||
err_input.enter(region), | ||
group_key, | ||
aggr, | ||
false, | ||
); | ||
let ok_partial = | ||
build_aggregate_stage(ok_input.enter(region), index, aggr, false); | ||
ok_partials.push( | ||
ok_partial | ||
.as_collection(move |key, val| (key.clone(), (index, val.clone()))) | ||
.leave(), | ||
); | ||
err_partials.push(err_partial.leave()); | ||
} | ||
}); | ||
|
||
|
@@ -155,15 +214,14 @@ where | |
result.extend(key.iter()); | ||
for ((_pos, val), cnt) in input.iter() { | ||
assert_eq!(*cnt, 1); | ||
result.push(val.unpack().pop().unwrap()); | ||
result.push(val.unpack_first()); | ||
} | ||
output.push((result.finish(), 1)); | ||
} | ||
}); | ||
let errs = differential_dataflow::collection::concatenate(scope, err_partials); | ||
(oks, errs) | ||
(oks, err_input) | ||
}; | ||
let index = (0..keys_clone.len()).collect::<Vec<_>>(); | ||
let index = (0..group_key.len()).collect::<Vec<_>>(); | ||
self.set_local_columns(relation_expr, &index[..], (oks, errs.arrange())); | ||
} | ||
} | ||
|
@@ -174,81 +232,31 @@ where | |
/// This method accommodates in-place aggregations like sums, hierarchical aggregations like min and max, | ||
/// and other aggregations that may be neither of those things. It also applies distinctness if required. | ||
fn build_aggregate_stage<G>( | ||
ok_input: Collection<G, Row>, | ||
err_input: Collection<G, DataflowError>, | ||
group_key: &[ScalarExpr], | ||
ok_input: Collection<G, (Row, Row)>, | ||
index: usize, | ||
aggr: &AggregateExpr, | ||
prepend_key: bool, | ||
) -> (Arrangement<G, Row>, Collection<G, DataflowError>) | ||
) -> Arrangement<G, Row> | ||
where | ||
G: Scope, | ||
G::Timestamp: Lattice, | ||
{ | ||
let AggregateExpr { | ||
func, | ||
expr, | ||
expr: _, | ||
distinct, | ||
} = aggr.clone(); | ||
|
||
// It is important that in the case of an error in the value selector we still provide a | ||
// value, so that the aggregation produces an aggregate with the correct key. If we do not, | ||
// the `ReduceCollation` operator panics. | ||
use timely::dataflow::channels::pact::Pipeline; | ||
let (partial, err_partial) = | ||
let mut partial = if !prepend_key { | ||
let mut packer = RowPacker::new(); | ||
ok_input.map(move |(key, row)| { | ||
let value = row.iter().nth(index).unwrap(); | ||
packer.push(value); | ||
(key, packer.finish_and_reuse()) | ||
}) | ||
} else { | ||
ok_input | ||
.inner | ||
.unary_fallible(Pipeline, "ReduceStagePreparation", |_cap, _info| { | ||
let group_key = group_key.to_vec(); | ||
let mut storage = Vec::new(); | ||
move |input, ok_output, err_output| { | ||
input.for_each(|time, data| { | ||
let temp_storage = RowArena::new(); | ||
let mut ok_session = ok_output.session(&time); | ||
let mut err_session = err_output.session(&time); | ||
data.swap(&mut storage); | ||
for (row, t, diff) in storage.drain(..) { | ||
// First, evaluate the key selector expressions. | ||
// If any error we produce their errors as output and note | ||
// the fact that the key was not correctly produced. | ||
let mut key_packer = RowPacker::new(); | ||
let mut error_free = true; | ||
let datums = row.unpack(); | ||
for expr in group_key.iter() { | ||
match expr.eval(&datums, &temp_storage) { | ||
Ok(val) => key_packer.push(val), | ||
Err(e) => { | ||
err_session.give((e.into(), t.clone(), diff)); | ||
error_free = false; | ||
} | ||
} | ||
} | ||
// Second, evaluate the value selector. | ||
// If any error occurs we produce both the error as output, | ||
// but also a `Datum::Null` value to avoid causing the later | ||
// "ReduceCollation" operator to panic due to absent aggregates. | ||
if error_free { | ||
let key = key_packer.finish(); | ||
match expr.eval(&datums, &temp_storage) { | ||
Ok(val) => { | ||
ok_session.give(((key, Row::pack(Some(val))), t, diff)); | ||
} | ||
Err(e) => { | ||
ok_session.give(( | ||
(key, Row::pack(Some(Datum::Null))), | ||
t.clone(), | ||
diff, | ||
)); | ||
err_session.give((e.into(), t, diff)); | ||
} | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
}); | ||
|
||
let mut partial = partial.as_collection(); | ||
let err_partial = err_partial.as_collection(); | ||
}; | ||
|
||
// If `distinct` is set, we restrict ourselves to the distinct `(key, val)`. | ||
if distinct { | ||
|
@@ -301,7 +309,7 @@ where | |
}) | ||
}; | ||
|
||
(ok_out, err_input.concat(&err_partial)) | ||
ok_out | ||
} | ||
|
||
/// Builds the dataflow for a reduction that can be performed in-place. | ||
|
@@ -319,6 +327,7 @@ where | |
G: Scope, | ||
G::Timestamp: Lattice, | ||
{ | ||
use differential_dataflow::operators::consolidate::ConsolidateStream; | ||
use timely::dataflow::operators::map::Map; | ||
|
||
let float_scale = f64::from(1 << 24); | ||
|
@@ -330,7 +339,7 @@ where | |
.explode({ | ||
let aggr = aggr.clone(); | ||
move |(key, row)| { | ||
let datum = row.unpack()[0]; | ||
let datum = row.unpack_first(); | ||
let (aggs, nonnulls) = match aggr { | ||
AggregateFunc::CountAll => { | ||
// Nothing beyond the accumulated count is needed. | ||
|
@@ -373,6 +382,7 @@ where | |
)) | ||
} | ||
}) | ||
.consolidate_stream() | ||
.reduce_abelian::<_, OrdValSpine<_, _, _, _>>( | ||
"ReduceAccumulable", | ||
move |key, input, output| { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to sort out whether this results in a per-row copy. Am I correct in thinking that it doesn't, because there will only be one element in
pushers
here? https://github.com/TimelyDataflow/timely-dataflow/blob/master/timely/src/dataflow/channels/pushers/tee.rs#L19There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure! At least, I think we could have clones if someone calls
flat_map_ref
multiple times and it resolves to the first case. There is a potential improvement to make to timely when you can push aFnMut(&Record)->Result
upstream, but this is a trickier discussion. A clone should only occur for the second + beyond cases where a stream actually gets attached to a downstream thing (that is what registers something in the Tee).Not sure if this helps clarify. Feel free to ask more about it though! Here, or interactively.