- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.7k
Sketch for aggregation intermediate results blocked management #11943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sketch for aggregation intermediate results blocked management #11943
Conversation
fdb1789    to
    f087efe      
    Compare
  
    | Thank you @Rachelint -- I hope to look at this more carefully tomorrow | 
| The benchmark after impl blocked version  And after we impl blocked version for all   | 
| 
 What is the difference between blocked approach and Emit::First with block size? At the end, there are only AllBlocks and FirstBlocks? | 
| let _ = self.update_memory_reservation(); | ||
| let batch = RecordBatch::try_new(schema, output)?; | ||
| Ok(batch) | ||
| let batches = outputs | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let batches = outputs | |
| outputs | |
| .into_iter() | |
| .map(|o| { | |
| RecordBatch::try_new(Arc::clone(&schema), o) | |
| .map_err(|e| DataFusionError::ArrowError(e, None)) | |
| }) | |
| .collect::<Result<VecDeque<_>>>() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such codes may be stale now.
| .into_iter() | ||
| .map(|o| { | ||
| RecordBatch::try_new(Arc::clone(&schema), o) | ||
| .map_err(|e| DataFusionError::ArrowError(e, None)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use macro for error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such codes may be stale now.
| /// scratch space for the current input [`RecordBatch`] being | ||
| /// processed. Reused across batches here to avoid reallocations | ||
| current_group_indices: Vec<usize>, | ||
| current_group_indices: Vec<u64>, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason to use u64 instead of usize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason to use u64 instead of usize
I think the clearer u64 is needed when we make the blocked impls,
we need to split the group_idx to two parts:
- high 32bits used to represent the block id
- low 32bits used to represent the block offset
So for reusing the same current_group_indices buffer in both flat and blocked mode, I modify all related group_idx to u64.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission
| @jayzhan211 Yes, still not any detailed blocked impls now, and just make benchmark for ensuring the sketch will not decrease the performance. I guess the blocked apporach may be not related much to  Actually the blocked approach's main traget is that it manages the data block by block in the  | 
| I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size). We have      fn emit_early_if_necessary(&mut self) -> Result<()> {
        if self.group_values.len() >= self.batch_size
            && matches!(self.group_ordering, GroupOrdering::None)
            && matches!(self.mode, AggregateMode::Partial)
            && self.update_memory_reservation().is_err()
        {
            let n = self.group_values.len() / self.batch_size * self.batch_size;
            let batch = self.emit(EmitTo::First(n), false)?;
            self.exec_state = ExecutionState::ProducingOutput(batch);
        }
        Ok(())
    }If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference? Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ? | 
| 
 Ok, I think I got it now, if we constantly  But  
 And in others, we need to poll to end, then  
 And in such cases, blocked approach may be effecient for both memory and cpu as stated above? | 
| 
 I think maybe we should keep them both, the  The  When will the blocked mode be enabled maybe can see in: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THank you @Rachelint -- I took a look at this PR and here is some feedback:
- 
I think it is important to spend time actually showing this approach makes some queries faster (e.g. we should try and update one accumulator and one implementation of groups to show it makes a difference) 
- 
I think it is important to actually chunk saving the intermediate state (e.g. in a Vec<...>rather than...to realize the benefit of this chunked approach
- 
Thank you for working on this. Very cool 
| /// `update_batch` or `merge_batch` will be shifted down by | ||
| /// `n`. See [`EmitTo::First`] for more details. | ||
| fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>; | ||
| fn evaluate(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would help to document what expectations are on the Vec of array refs
| /// Emit all groups managed by blocks | ||
| AllBlocks, | ||
| /// Emit only the first `n` group blocks, | ||
| /// similar as `First`, but used in blocked `GroupValues` and `GroupAccumulator`. | ||
| /// | ||
| /// For example, `n=3`, `block size=4`, finally 12 groups will be returned. | ||
| FirstBlocks(usize), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having two parallel emission modes for blocked output, I wonder if we could have some sort of "take" mode whose semantics did not shift the existing values down
For example, what if we introduced a notion of "block" across the group keys and and aggregators
pub enum EmitTo {
    /// Same
    All,
    /// Same
    First(usize),
    /// Takes the N'th block of rows from this accumulator
    /// it is an error to take the same batch twice or to emit `All` or `First` 
    /// after any TakeBatch(usize)
    TakeBlock(usize)
}And then we would for example, make sure the group values and aggregators all saved data using blocks of 100K rows
Then to emit 1M rows, the accumulators would emit like
EmitTo::TakeBlock(0)
EmitTo::TakeBlock(1)
EmitTo::TakeBlock(2)
...
EmitTo::TakeBlock(9)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a great idea for reducing code changes(we dont need to refactor the returned emit value for T to Vec).
But with Emit::TakeBlock(idx), seems we need to record the current block id in outer, maybe a bit complicated?
we just define the Emit::CurrentBlock, and use the iterator approach to impl AllBlocks and FirstBlocks defined now?
If this makes sense, I can try switch the sketch to this way.
pub enum EmitTo {
    /// Same
    All,
    /// Same
    First(usize),
    /// Return the current block of rows from this accumulator
    /// We should always use this emit mode in blocked mode accumulator.
    CurrentBlock,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the iterator approach to impl AllBlocks and FirstBlocks defined now
Not sure how does this work, but it looks like a neat idea. If we apply the same idea to "element" (First and All), and consider it as a specialized case with block_size = 1, I think we could end up a pretty nice abstraction. Probably we just need EmitTo::Block(block_size) 🤔  However, it is too far way from now. 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the iterator approach to impl AllBlocks and FirstBlocks defined now
Not sure how does this work, but it looks like a neat idea. If we apply the same idea to "element" (First and All), and consider it as a specialized case with block_size = 1, I think we could end up a pretty nice abstraction. Probably we just need
EmitTo::Block(block_size)🤔 However, it is too far way from now. 😆
🤔 Yes, other emit mode can indeed seen as a case with specialized blocke size in the iterator approach. But considered about performance, it is better to let batch_size == block_size.
After introduce the iterator approach, just 200+ codes to finished the  sketch, compared to the stale version sketch with 600+.
The main work is just to add a stream state ExecutionState::ProducingBlocks(blocks) .
https://github.com/Rachelint/arrow-datafusion/blob/d79d912d1677549c825cafc405911973ace0df46/datafusion/physical-plan/src/aggregates/row_hash.rs#L728
Maybe it can show how the blocked optimzation works.
| where | ||
| F: Fn(bool, bool) -> bool + Send + Sync, | ||
| { | ||
| fn update_batch( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to realize the benefit of this blocked implementation I think you will need to change the state of the accumulators so that instead of a single large buffer
    /// values per group
    values: BooleanBufferBuilder,The state is held in chunks like
    /// blocks of values per group
    values: Vec<BooleanBufferBuilder>(or possibly this to support taking them out individually)
    /// blocks of values per group, None when taken
    values: Vec<Option<BooleanBufferBuilder>>There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree.
This sketch originally want to support some combinations like:
- Single GroupValues + single GroupAccumulator
- Blocked GroupValues + single GroupAccumulator
- Blocked GroupValues + blocked GroupAccumulator
But after considering, it may just make the codes so complicated, and maybe can't have obvious improvement in Blocked GroupValues + single GroupAccumulator mode (constantly slice call still exist, or some even more expansive operations will be introduced if we impl it without slice).
| /// scratch space for the current input [`RecordBatch`] being | ||
| /// processed. Reused across batches here to avoid reallocations | ||
| current_group_indices: Vec<usize>, | ||
| current_group_indices: Vec<u64>, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission
| 
 I still don't understand this  Also thanks for pushing this forward, I think this approach is promising for performance | 
| @2010YOUY01 make sense, it seems  But the  For example: 
 Such a groups movement will obviously lead to much cpu cost... Actually we can remove the  For example, in  But I think if we impl like this, it will be so confused, and for making it clear, I introduce the two new blocked emission mode  | 
| 
 Thanks, I have finished a blocked style  | 
| @2010YOUY01 After checking the codes about memory contorl, I think I got it. 
 They all serve for the spilling. And the logic may be like this: 
 | 
| 
 Thanks, now I figured out the high-level idea of spilling in aggregation and how  However there exists other code that does early emit in aggregation, and I'm still trying to figure out how they work, do you have any pointer for that? I'm guessing it's used in streaming aggregation or some pushed-down limits datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs Lines 605 to 611 in 482ef45 
 | 
| 
 Yes, you are right, there are two early emission cases, one is for spilling mentioned above, and another here is about streaming. | 
076d88e    to
    ab92626      
    Compare
  
    | 
 Sorry for the delay -- I am back now full time and will review this PR over the next few days | 
| Marking as draft as I don't think this is waiting on review and I am trying to keep the review backlog under control | 
| Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. | 
| Codes here are stale, I will submit a new pr when starting to push it forward. | 
| /// we allocate a new block (also with the same predefined block size based capacity) | ||
| // instead of expanding the current one and copying the data. | ||
| /// We plan to make this the default in the future when tests are enough. | ||
| pub enable_aggregation_intermediate_states_blocked_approach: bool, default = false | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making it default / not a config should reduce complexity...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just not sure if we should enable it by default at the beginning.
But as I see, current tests may be enough to keep the correctness of this optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think coverage is high enough, so if we can show that the test pass, and benchmarks don't show (large) regressions, there is no real downside for enabling it.
Maybe we can add some extra tests for e.g. testing the reduced memory usage with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pr is too stale to continue developing, I am pushing it forward in #15591 (mainly copy the necessary codes from here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect! Let's close this one
Which issue does this PR close?
Part of #11931 , part of #7065
Rationale for this change
Detail can see #11931
Are these changes tested?
By exist tests.
Are there any user-facing changes?
Two functions are added to
GroupValuesandGroupAccumulatortrait.