-
Notifications
You must be signed in to change notification settings - Fork 1k
feat: support push batch direct to completed and add biggest coalesce batch support #8146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support push batch direct to completed and add biggest coalesce batch support #8146
Conversation
/// assert_eq!(completed_batch, expected_batch); | ||
/// ``` | ||
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { | ||
if let Some(limit) = self.biggest_coalesce_batch_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this makes only sense whenever there are no in progress batches?
So whenever we get a large batch and we do not need to copy, just output the batch, in other cases go on with the default path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Dandandan and @alamb for review!
I wonder if this makes only sense whenever there are no in progress batches? So whenever we get a large batch and we do not need to copy, just output the batch, in other cases go on with the default path.
I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything)
Yeah, actually here we already avoid the in progress batches operation because we will return early and flush the buffer and directly return to completed without copying to in progress.
So i think we already no copy here? I am not sure i am missing something, please correct me, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if there are already smaller batches in the in progress array and the condition holds, the array will be created and in progress batches will be copied, creating two batches.
It would be worth testing I think only having the short path if the condition holds and there are no (smaller) in progress batches, so there is only one output batch being generated.
Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).
Great point @Dandandan , i agree this is a todo, i was also thinking, even we skip coalesce large batch, we will still output smaller batches before the large batch. If we don't need sort, we can combine all smaller batches, but the logic may need more investigation(how to define and get sort flag in low level, etc).
I mean if there are already smaller batches in the in progress array and the condition holds, the array will be created and in progress batches will be copied, creating two batches.
It would be worth testing I think only having the short path if the condition holds and there are no (smaller) in progress batches, so there is only one output batch being generated.
Interesting, so if i make sense right, there are two cases for it, for example we target batch size is 1000, input batches:
- 20, 20, 700, 20, 30, 700, 30, 20, 10, 15
- 20, 20, 30, 700, 600, 700, 900, 700, 600
So we can only apply short path for continue large batch, the case 2, because when we meet continue large batches, we will not have progress batches, so we can output directly.
But for case 1, we will not apply the short path, we will keep to coalesce it, because before each large batch, we already have small batches in progress, so we don't apply short path.
Am i right? I am not sure the performance result, i need to testing with datafusion, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if there are already smaller batches in the in progress array and the condition holds, the array will be created and in progress batches will be copied, creating two batches.
It would be worth testing I think only having the short path if the condition holds and there are no (smaller) in progress batches, so there is only one output batch being generated.
Addressed above comments in latest PR.
Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).
Also added todo for this point, i can create a follow-up ticket after this PR, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current behaviour for this PR:
// Large batch bypass optimization:
// When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
// we can avoid expensive split-and-merge operations by passing it through directly.
//
// IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
// is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
// If not set (None), ALL batches follow normal coalescing behavior regardless of size.
// =============================================================================
// CASE 1: No buffer + large batch → Direct bypass
// =============================================================================
// Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
// Input sequence: [600, 1200, 300]
//
// With biggest_coalesce_batch_size=Some(500) (optimization enabled):
// 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
// → output: [600] (bypass, preserves large batch)
// 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
// → output: [1200] (bypass, preserves large batch)
// 300 → normal batch, buffer: [300]
// Result: [600], [1200], [300] - large batches preserved, mixed sizes
// =============================================================================
// CASE 2: Buffer too large + large batch → Flush first, then bypass
// =============================================================================
// This case prevents creating extremely large merged batches that would
// significantly exceed both target_batch_size and biggest_coalesce_batch_size.
//
// Example 1: Buffer exceeds limit before large batch arrives
// target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
// Input: [350, 200, 800]
//
// Step 1: push_batch([350])
// → batch_size=350 <= 400, normal path
// → buffer: [350], buffered_rows=350
//
// Step 2: push_batch([200])
// → batch_size=200 <= 400, normal path
// → buffer: [350, 200], buffered_rows=550
//
// Step 3: push_batch([800])
// → batch_size=800 > 400, large batch path
// → buffered_rows=550 > 400 → Case 2: flush first
// → flush: output [550] (combined [350, 200])
// → then bypass: output [800]
// Result: [550], [800] - buffer flushed to prevent oversized merge
//
// Example 2: Multiple small batches accumulate before large batch
// target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
// Input: [150, 100, 80, 900]
//
// Step 1-3: Accumulate small batches
// 150 → buffer: [150], buffered_rows=150
// 100 → buffer: [150, 100], buffered_rows=250
// 80 → buffer: [150, 100, 80], buffered_rows=330
//
// Step 4: push_batch([900])
// → batch_size=900 > 300, large batch path
// → buffered_rows=330 > 300 → Case 2: flush first
// → flush: output [330] (combined [150, 100, 80])
// → then bypass: output [900]
// Result: [330], [900] - prevents merge into [1230] which would be too large
// =============================================================================
// CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
// =============================================================================
// When buffer is small enough, we still merge to maintain efficiency
// Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
// Input: [300, 1200]
//
// Step 1: push_batch([300])
// → batch_size=300 <= 500, normal path
// → buffer: [300], buffered_rows=300
//
// Step 2: push_batch([1200])
// → batch_size=1200 > 500, large batch path
// → buffered_rows=300 <= 500 → Case 3: normal merge
// → buffer: [300, 1200] (1500 total)
// → 1500 > target_batch_size → split: output [1000], buffer [500]
// Result: [1000], [500] - normal split/merge behavior maintained
// =============================================================================
// Comparison: Default vs Optimized Behavior
// =============================================================================
// target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
// Input: [600, 1200, 300]
//
// DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
// 600 → buffer: [600]
// 1200 → buffer: [600, 1200] (1800 rows total)
// → split: output [1000 rows], buffer [800 rows remaining]
// 300 → buffer: [800, 300] (1100 rows total)
// → split: output [1000 rows], buffer [100 rows remaining]
// Result: [1000], [1000], [100] - all outputs respect target_batch_size
//
// OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
// 600 → Case 1: direct bypass → output: [600]
// 1200 → Case 1: direct bypass → output: [1200]
// 300 → normal path → buffer: [300]
// Result: [600], [1200], [300] - large batches preserved
// =============================================================================
// Benefits and Trade-offs
// =============================================================================
// Benefits of the optimization:
// - Large batches stay intact (better for downstream vectorized processing)
// - Fewer split/merge operations (better CPU performance)
// - More predictable memory usage patterns
// - Maintains streaming efficiency while preserving batch boundaries
//
// Trade-offs:
// - Output batch sizes become variable (not always target_batch_size)
// - May produce smaller partial batches when flushing before large batches
// - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
// TODO, for unsorted batches, we may can filter all large batches, and coalesce all
// small batches together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).
Also added todo for this point, i can create a follow-up ticket after this PR, thanks!
I think this is a wise idea. I suspect there many things in DataFusion that implicitly assume that rows are not reordered so we will have to test such a change carefully
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas -- this looks great and very well tested
I have a few comments and I think @Dandandan 's is worth resolving as well, but otherwise I think this PR is ready to go
/// assert_eq!(completed_batch, expected_batch); | ||
/// ``` | ||
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { | ||
if let Some(limit) = self.biggest_coalesce_batch_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything)
arrow-select/src/coalesce.rs
Outdated
} | ||
|
||
#[test] | ||
fn test_biggest_coalesce_batch_size_constructor_method() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it in latest PR, i also think it's not needed.
arrow-select/src/coalesce.rs
Outdated
} | ||
|
||
#[test] | ||
fn test_biggest_coalesce_batch_size_getter_setter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it is strictly necessary to test the getter/setter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it is strictly necessary to test the getter/setter
Thank you @alamb , i agree it's not needed, removed in latest PR.
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Thank you @alamb for review, addressed comments in latest PR, i also will wait @Dandandan feedback, thanks! |
Thank you @Dandandan @alamb , updated in latest PR:
Added todo, may be follow-up ticket about the no sort optimization, it will be more complex, but useful. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks (again) @zhuqi-lucas and @Dandandan
I pushed a few OCD comment updates to this PR, but I don't think they were strictly necessary. I just had the code checked out and couldn't help myself.
This is really nice and a good example of careful optimization work 👨🍳 👌
/// assert_eq!(completed_batch, expected_batch); | ||
/// ``` | ||
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { | ||
if let Some(limit) = self.biggest_coalesce_batch_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).
Also added todo for this point, i can create a follow-up ticket after this PR, thanks!
I think this is a wise idea. I suspect there many things in DataFusion that implicitly assume that rows are not reordered so we will have to test such a change carefully
Great, thank you @alamb ! |
Thanks again @zhuqi-lucas |
Which issue does this PR close?
needed for:
apache/datafusion#17193
Rationale for this change
What changes are included in this PR?
Add more public API which is needed for apache datafusion.
Are these changes tested?
yes
Added unit test.
Are there any user-facing changes?
No