- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1k
Optimize coalesce kernel for StringView (10-50% faster) #7650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Update here is that this PR is quite a bit slower than main when coopying many string values. I am looking into why | 
      
        
              This comment was marked as outdated.
        
        
      
    
  This comment was marked as outdated.
      
        
              This comment was marked as outdated.
        
        
      
    
  This comment was marked as outdated.
      
        
              This comment was marked as outdated.
        
        
      
    
  This comment was marked as outdated.
| batch_size: usize, | ||
| /// In-progress buffered batches | ||
| buffer: Vec<RecordBatch>, | ||
| /// In-progress arrays | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change of this PR is to introduce per-type "InProgressArrays" that have specializations for each type of array
This PR adds a specialization for StringViewArray (to remove the need for gc_string_view_batch)
In follow on PRs (maybe tickets) we can add specialized versions for PrimitiveArray, StringArray, etc
        
          
                arrow-select/src/coalesce.rs
              
                Outdated
          
        
      | } | ||
|  | ||
| fn finish(&mut self) -> Result<ArrayRef, ArrowError> { | ||
| // Concatenate all buffered arrays into a single array, which uses 2x | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default fallback does the same as the existing coalesce kernel and buffers all the input arrays until it is time to output and then calls concat
Over time I hope to replace this memory inefficient algorithm with one that is both more memory efficient and faster
| /// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the | ||
| /// `StringViewArray` may only refer to a small portion of the buffer, | ||
| /// significantly increasing memory usage. | ||
| fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic of gc_string_view_batch has been incorporated into InProgressStringViewArray, and thus saves a (third!!!)  copy of the strings for some StringViewArrays
        
          
                arrow-select/src/coalesce.rs
              
                Outdated
          
        
      | /// See [`Self::next_completed_batch()`] to retrieve any completed batches. | ||
| pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { | ||
| pub fn push_batch(&mut self, mut batch: RecordBatch) -> Result<(), ArrowError> { | ||
| if batch.num_rows() == 0 { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe consider not trying to coalesce the batch whenever it has more than 1/2 (or some other factor) of rows of batch_size?
Probably best to have it configurable, as it depends on the usage if this is a good idea or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic was once in DataFusion, but is somehow gone 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks -- that is a good idea -- I need to review what, if anything, assumes perfectly sized target batches...
      
        
              This comment was marked as outdated.
        
        
      
    
  This comment was marked as outdated.
      
        
              This comment was marked as outdated.
        
        
      
    
  This comment was marked as outdated.
| 🤖  | 
| I removed the use of  I am currently wondering if it is related to caching effects. I am going to try and change the benchmark so it regenerates the inputs each time rather than pre-computing them | 
      
        
              This comment was marked as outdated.
        
        
      
    
  This comment was marked as outdated.
| Well, I got some of the performance back but I still can't explain why it is 10% slower. The only thing I can come up with is that the coalesce kernel is slower due to allocations or something Maybe I can show that i can make up the difference with some other optimization | 
| Ok, I figured out what is going on and why the  This is right under the cutoff load factor (0.5) that would force a a copy of the strings into new buffers However, on this branch, because the GC happens after the input is sliced the overall load factor is smaller which triggers the GC in some cases If I hard code the gc heuristic to be different index 0be8702c1b..5e4695dd7e 100644
--- a/arrow-select/src/coalesce/byte_view.rs
+++ b/arrow-select/src/coalesce/byte_view.rs
@@ -290,7 +290,7 @@ impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
         // Copying the strings into a buffer can be time-consuming so
         // only do it if the array is sparse
-        if actual_buffer_size > (ideal_buffer_size * 2) {
+        if actual_buffer_size > (ideal_buffer_size * 100) {
             self.append_views_and_copy_strings(s.views(), ideal_buffer_size, buffers);
         } else {
             self.append_views_and_update_buffer_index(s.views(), buffers);The performance for this benchmark is the same as on main I am thinking about how best to fix this | 
| 🤖  | 
| 🤖: Benchmark completed Details
  | 
| This is the last remaining one I could reproduce a very small delta locally (I measured 5-7% slower) I am pretty sure I can make it up when I bring the filtering into the coalesce kernel (which will stop making intermediate Arrays) so I would like to proceed with this PR. I will polish it up and make sure it is ready for review tomorrow morning | 
| I think this one is ready for review now | 
| .iter() | ||
| .map(|v| { | ||
| let len = (*v as u32) as usize; | ||
| if len > 12 { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should set this as constant somewhere and use it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea and I will do it in a follow on PR
| Thanks again for all the help @Dandandan -- I'll try and make a few more PRs to this kernel for primitive arrays as well as optimizing the filtering shortly | 
| pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { | ||
| if batch.num_rows() == 0 { | ||
| // If the batch is empty, we don't need to do anything | ||
| let (_schema, arrays, mut num_rows) = batch.into_parts(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the core logic here -- to break up the RecordBatch and incrementally copy rows into the target output.
| .iter() | ||
| .map(|v| { | ||
| let len = (*v as u32) as usize; | ||
| if len > 12 { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea and I will do it in a follow on PR
# Which issue does this PR close? As suggested by @Dandandan in #7650 (comment): > We probably should set this as constant somewhere and use it # Rationale for this change Using a symbolic constant in the code rather than a hard coded constant makes it easier to: 1. Understand what the value means 2. Link / attach documentation to the constant to provide context # What changes are included in this PR? 1. Introduce `MAX_INLINE_VIEW_LEN` constant for string/byte views 2. Update code to use that instead of `12` # Are there any user-facing changes? A new constant
Which issue does this PR close?
Rationale for this change
Currently the
coalescekernel buffers views / data until there are enough rows and then concat's the results together. StringViewArrays can be even worse as there is a second copy ingc_string_view_batchThis is wasteful because it
We can make it faster and more memory efficient by directly creating the output array
What changes are included in this PR?
StringViewArraywithout bufferingNote this PR does NOT (yet) add specialized filtering -- instead it focuses on reducing the
overhead of appending views by not copying them (again!) with
gc_string_view_batchOpen questions:
The differences are that the
Are there any user-facing changes?
The kernel is faster, no API changes