-
Notifications
You must be signed in to change notification settings - Fork 481
Reorganize reduce to minimize allocations #3130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reorganize reduce to minimize allocations #3130
Conversation
The most recent commit improves the times by fixing an off-by-one error: for
|
} else if let Some(trace) = self.trace.get(relation_expr) { | ||
let (_id, oks, errs) = trace.values().next().expect("Empty arrangement"); | ||
Some(( | ||
// oks.as_collection(|_k, v| v.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stray comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure is! Thanks. :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM
L: FnMut(&V) -> I + 'static, | ||
{ | ||
if let Some((oks, err)) = self.collections.get(relation_expr) { | ||
Some((oks.flat_map(move |v| logic(&v)), err.clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to sort out whether this results in a per-row copy. Am I correct in thinking that it doesn't, because there will only be one element in pushers
here? https://github.com/TimelyDataflow/timely-dataflow/blob/master/timely/src/dataflow/channels/pushers/tee.rs#L19
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure! At least, I think we could have clones if someone calls flat_map_ref
multiple times and it resolves to the first case. There is a potential improvement to make to timely when you can push a FnMut(&Record)->Result
upstream, but this is a trickier discussion. A clone should only occur for the second + beyond cases where a stream actually gets attached to a downstream thing (that is what registers something in the Tee).
Not sure if this helps clarify. Feel free to ask more about it though! Here, or interactively.
src/dataflow/render/reduce.rs
Outdated
// Our first step is to extract `(key, vals)` from `input`. | ||
// We do this carefully, attempting to avoid unneccesary allocations | ||
// that would result from cloning rows in input arrangements. | ||
let group_key_owned = group_key.to_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why two separate clones of the key vector are needed. You already have keys_clone
above, which is only ever used for its length on line 222. At any rate, we should be consistent about whether the naming convention is _owned
or _clone
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can just delete the first one and use group_key.len()
later on!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At any rate, we should be consistent about whether the naming convention is
_owned
or_clone
.
I'll try to sort this out. Two different things are happening though: in one, a "clone" and in the second the more explicit conversion to an owned vector (which can be gotten by calling clone()
but for then you have to check out the type to see if you cloned the reference, or what you got; to_vec()
being relatively unambiguous).
// First, evaluate the key selector expressions. | ||
// If any error we produce their errors as output and note | ||
// the fact that the key was not correctly produced. | ||
let datums = row.iter().take(columns_needed).collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create the vector outside the closure (then clear/fill it from the iter inside) in order to reuse the allocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot! At least not without a more advanced implementation. Roughly: the lifetime in there is that of 'storage
, which is the lifetime of the &Row
we get to look at and the lifetime of the associated Datum
types. That comes in to existence in the flat_map_ref
call. With some more work (I tried, and I think I walked away from) you can stash a vector like that, and then offer it for use, but it's horrible in the case of "not an arrangement" because there is no such storage and in that case you need to re-alloc a vector for each Row
..
I'll put a note that we could look in to it though!
src/dataflow/render/reduce.rs
Outdated
} | ||
|
||
let mut row_packer = RowPacker::new(); | ||
let (key_input, mut err_input): ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is called key_input
despite computing both the key and the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure not for any great reason. Will fix!
src/dataflow/render/reduce.rs
Outdated
|
||
// Demux out the potential errors from key and value selector evaluation. | ||
use timely::dataflow::operators::ok_err::OkErr; | ||
let (ok, err) = key_input.inner.ok_err(|(x, t, d)| match x { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the comment here is wrong (the closure returns a Result
, not a bool
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah that comment was totally copy/pasted (badly) from branch.rs
. Oops. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several improvements to
reduce.rs
to improve primarily start-up time, though probably throughput as well.On 8 workers, a
select count(*) from
10M records is (on master):After the fixes here:
There are a few flavors of fixes. The most invasive is pivoting away from
Context::collection(expr)
to a methodflat_map_ref
that acts as a flatmap on a reference to an arrangement's data. This allows us to avoid a full clone of the data, and push reduction all the way back to where we extract it from an arrangement. This can and should be used elsewhere we extract data from arrangements and intend to filter or project it.This change is