Skip to content

Conversation

frankmcsherry
Copy link
Contributor

@frankmcsherry frankmcsherry commented May 22, 2020

Several improvements to reduce.rs to improve primarily start-up time, though probably throughput as well.

On 8 workers, a select count(*) from 10M records is (on master):

materialize=> select count(*) from upsert_text_source;
  count  
---------
 9556820
(1 row)

Time: 3240.513 ms (00:03.241)
materialize=> 

After the fixes here:

materialize=> select count(*) from upsert_text_source;
  count  
---------
 9556820
(1 row)

Time: 1255.278 ms (00:01.255)
materialize=> 

There are a few flavors of fixes. The most invasive is pivoting away from Context::collection(expr) to a method flat_map_ref that acts as a flatmap on a reference to an arrangement's data. This allows us to avoid a full clone of the data, and push reduction all the way back to where we extract it from an arrangement. This can and should be used elsewhere we extract data from arrangements and intend to filter or project it.


This change is Reviewable

@frankmcsherry frankmcsherry requested a review from umanwizard May 22, 2020 19:11
@frankmcsherry
Copy link
Contributor Author

The most recent commit improves the times by fixing an off-by-one error: for count(*) we would unpack the first field, which was not necessary and causing us to dereference memory that did not require it. Probably not an improvement on anything other than select count(*) from ...

materialize=> select count(*) from upsert_text_source;
  count  
---------
 9556820
(1 row)

Time: 949.272 ms
materialize=> 

} else if let Some(trace) = self.trace.get(relation_expr) {
let (_id, oks, errs) = trace.values().next().expect("Empty arrangement");
Some((
// oks.as_collection(|_k, v| v.clone()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure is! Thanks. :D

Copy link
Contributor

@umanwizard umanwizard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM

L: FnMut(&V) -> I + 'static,
{
if let Some((oks, err)) = self.collections.get(relation_expr) {
Some((oks.flat_map(move |v| logic(&v)), err.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to sort out whether this results in a per-row copy. Am I correct in thinking that it doesn't, because there will only be one element in pushers here? https://github.com/TimelyDataflow/timely-dataflow/blob/master/timely/src/dataflow/channels/pushers/tee.rs#L19

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure! At least, I think we could have clones if someone calls flat_map_ref multiple times and it resolves to the first case. There is a potential improvement to make to timely when you can push a FnMut(&Record)->Result upstream, but this is a trickier discussion. A clone should only occur for the second + beyond cases where a stream actually gets attached to a downstream thing (that is what registers something in the Tee).

Not sure if this helps clarify. Feel free to ask more about it though! Here, or interactively.

// Our first step is to extract `(key, vals)` from `input`.
// We do this carefully, attempting to avoid unneccesary allocations
// that would result from cloning rows in input arrangements.
let group_key_owned = group_key.to_vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why two separate clones of the key vector are needed. You already have keys_clone above, which is only ever used for its length on line 222. At any rate, we should be consistent about whether the naming convention is _owned or _clone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can just delete the first one and use group_key.len() later on!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At any rate, we should be consistent about whether the naming convention is _owned or _clone.

I'll try to sort this out. Two different things are happening though: in one, a "clone" and in the second the more explicit conversion to an owned vector (which can be gotten by calling clone() but for then you have to check out the type to see if you cloned the reference, or what you got; to_vec() being relatively unambiguous).

// First, evaluate the key selector expressions.
// If any error we produce their errors as output and note
// the fact that the key was not correctly produced.
let datums = row.iter().take(columns_needed).collect::<Vec<_>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create the vector outside the closure (then clear/fill it from the iter inside) in order to reuse the allocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot! At least not without a more advanced implementation. Roughly: the lifetime in there is that of 'storage, which is the lifetime of the &Row we get to look at and the lifetime of the associated Datum types. That comes in to existence in the flat_map_ref call. With some more work (I tried, and I think I walked away from) you can stash a vector like that, and then offer it for use, but it's horrible in the case of "not an arrangement" because there is no such storage and in that case you need to re-alloc a vector for each Row..

I'll put a note that we could look in to it though!

}

let mut row_packer = RowPacker::new();
let (key_input, mut err_input): (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is called key_input despite computing both the key and the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure not for any great reason. Will fix!


// Demux out the potential errors from key and value selector evaluation.
use timely::dataflow::operators::ok_err::OkErr;
let (ok, err) = key_input.inner.ok_err(|(x, t, d)| match x {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the comment here is wrong (the closure returns a Result, not a bool).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah that comment was totally copy/pasted (badly) from branch.rs. Oops. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankmcsherry frankmcsherry merged commit 3389ffe into MaterializeInc:master Jun 1, 2020
@frankmcsherry frankmcsherry deleted the improve_render branch February 11, 2022 03:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants