Skip to content

Conversation

zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Aug 15, 2025

Which issue does this PR close?

needed for:
apache/datafusion#17193

Rationale for this change

        // Large batch bypass optimization:
        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
        // we can avoid expensive split-and-merge operations by passing it through directly.
        //
        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.

        // =============================================================================
        // CASE 1: No buffer + large batch → Direct bypass
        // =============================================================================
        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
        // Input sequence: [600, 1200, 300]
        //
        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
        //        → output: [600] (bypass, preserves large batch)
        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
        //         → output: [1200] (bypass, preserves large batch)
        //   300 → normal batch, buffer: [300]
        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes

        // =============================================================================
        // CASE 2: Buffer too large + large batch → Flush first, then bypass
        // =============================================================================
        // This case prevents creating extremely large merged batches that would
        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
        //
        // Example 1: Buffer exceeds limit before large batch arrives
        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
        // Input: [350, 200, 800]
        //
        // Step 1: push_batch([350])
        //   → batch_size=350 <= 400, normal path
        //   → buffer: [350], buffered_rows=350
        //
        // Step 2: push_batch([200])
        //   → batch_size=200 <= 400, normal path
        //   → buffer: [350, 200], buffered_rows=550
        //
        // Step 3: push_batch([800])
        //   → batch_size=800 > 400, large batch path
        //   → buffered_rows=550 > 400 → Case 2: flush first
        //   → flush: output [550] (combined [350, 200])
        //   → then bypass: output [800]
        //   Result: [550], [800] - buffer flushed to prevent oversized merge
        //
        // Example 2: Multiple small batches accumulate before large batch
        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
        // Input: [150, 100, 80, 900]
        //
        // Step 1-3: Accumulate small batches
        //   150 → buffer: [150], buffered_rows=150
        //   100 → buffer: [150, 100], buffered_rows=250
        //   80  → buffer: [150, 100, 80], buffered_rows=330
        //
        // Step 4: push_batch([900])
        //   → batch_size=900 > 300, large batch path
        //   → buffered_rows=330 > 300 → Case 2: flush first
        //   → flush: output [330] (combined [150, 100, 80])
        //   → then bypass: output [900]
        //   Result: [330], [900] - prevents merge into [1230] which would be too large

        // =============================================================================
        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
        // =============================================================================
        // When buffer is small enough, we still merge to maintain efficiency
        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
        // Input: [300, 1200]
        //
        // Step 1: push_batch([300])
        //   → batch_size=300 <= 500, normal path
        //   → buffer: [300], buffered_rows=300
        //
        // Step 2: push_batch([1200])
        //   → batch_size=1200 > 500, large batch path
        //   → buffered_rows=300 <= 500 → Case 3: normal merge
        //   → buffer: [300, 1200] (1500 total)
        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
        //   Result: [1000], [500] - normal split/merge behavior maintained

        // =============================================================================
        // Comparison: Default vs Optimized Behavior
        // =============================================================================
        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
        // Input: [600, 1200, 300]
        //
        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
        //   600 → buffer: [600]
        //   1200 → buffer: [600, 1200] (1800 rows total)
        //         → split: output [1000 rows], buffer [800 rows remaining]
        //   300 → buffer: [800, 300] (1100 rows total)
        //        → split: output [1000 rows], buffer [100 rows remaining]
        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
        //
        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
        //   600 → Case 1: direct bypass → output: [600]
        //   1200 → Case 1: direct bypass → output: [1200]
        //   300 → normal path → buffer: [300]
        //   Result: [600], [1200], [300] - large batches preserved

        // =============================================================================
        // Benefits and Trade-offs
        // =============================================================================
        // Benefits of the optimization:
        // - Large batches stay intact (better for downstream vectorized processing)
        // - Fewer split/merge operations (better CPU performance)
        // - More predictable memory usage patterns
        // - Maintains streaming efficiency while preserving batch boundaries
        //
        // Trade-offs:
        // - Output batch sizes become variable (not always target_batch_size)
        // - May produce smaller partial batches when flushing before large batches
        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance

        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
        // small batches together?

What changes are included in this PR?

Add more public API which is needed for apache datafusion.

Are these changes tested?

yes

Added unit test.

Are there any user-facing changes?

No

@github-actions github-actions bot added the arrow Changes to the arrow crate label Aug 15, 2025
@zhuqi-lucas zhuqi-lucas changed the title Draft: support push batch direct to completed(upstream needed for https://github.com/apache/datafusion/pull/17193) feat: support push batch direct to completed as a public API Aug 16, 2025
@zhuqi-lucas zhuqi-lucas changed the title feat: support push batch direct to completed as a public API feat: support push batch direct to completed as a public API and add biggest coalesce batch support Aug 16, 2025
/// assert_eq!(completed_batch, expected_batch);
/// ```
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
if let Some(limit) = self.biggest_coalesce_batch_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this makes only sense whenever there are no in progress batches?
So whenever we get a large batch and we do not need to copy, just output the batch, in other cases go on with the default path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything)

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Dandandan and @alamb for review!

I wonder if this makes only sense whenever there are no in progress batches? So whenever we get a large batch and we do not need to copy, just output the batch, in other cases go on with the default path.

I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything)

Yeah, actually here we already avoid the in progress batches operation because we will return early and flush the buffer and directly return to completed without copying to in progress.

So i think we already no copy here? I am not sure i am missing something, please correct me, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if there are already smaller batches in the in progress array and the condition holds, the array will be created and in progress batches will be copied, creating two batches.
It would be worth testing I think only having the short path if the condition holds and there are no (smaller) in progress batches, so there is only one output batch being generated.

Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).

Great point @Dandandan , i agree this is a todo, i was also thinking, even we skip coalesce large batch, we will still output smaller batches before the large batch. If we don't need sort, we can combine all smaller batches, but the logic may need more investigation(how to define and get sort flag in low level, etc).

I mean if there are already smaller batches in the in progress array and the condition holds, the array will be created and in progress batches will be copied, creating two batches.
It would be worth testing I think only having the short path if the condition holds and there are no (smaller) in progress batches, so there is only one output batch being generated.

Interesting, so if i make sense right, there are two cases for it, for example we target batch size is 1000, input batches:

  1. 20, 20, 700, 20, 30, 700, 30, 20, 10, 15
  2. 20, 20, 30, 700, 600, 700, 900, 700, 600

So we can only apply short path for continue large batch, the case 2, because when we meet continue large batches, we will not have progress batches, so we can output directly.

But for case 1, we will not apply the short path, we will keep to coalesce it, because before each large batch, we already have small batches in progress, so we don't apply short path.

Am i right? I am not sure the performance result, i need to testing with datafusion, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if there are already smaller batches in the in progress array and the condition holds, the array will be created and in progress batches will be copied, creating two batches.
It would be worth testing I think only having the short path if the condition holds and there are no (smaller) in progress batches, so there is only one output batch being generated.

Addressed above comments in latest PR.

Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).

Also added todo for this point, i can create a follow-up ticket after this PR, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current behaviour for this PR:

        // Large batch bypass optimization:
        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
        // we can avoid expensive split-and-merge operations by passing it through directly.
        //
        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.

        // =============================================================================
        // CASE 1: No buffer + large batch → Direct bypass
        // =============================================================================
        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
        // Input sequence: [600, 1200, 300]
        //
        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
        //        → output: [600] (bypass, preserves large batch)
        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
        //         → output: [1200] (bypass, preserves large batch)
        //   300 → normal batch, buffer: [300]
        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes

        // =============================================================================
        // CASE 2: Buffer too large + large batch → Flush first, then bypass
        // =============================================================================
        // This case prevents creating extremely large merged batches that would
        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
        //
        // Example 1: Buffer exceeds limit before large batch arrives
        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
        // Input: [350, 200, 800]
        //
        // Step 1: push_batch([350])
        //   → batch_size=350 <= 400, normal path
        //   → buffer: [350], buffered_rows=350
        //
        // Step 2: push_batch([200])
        //   → batch_size=200 <= 400, normal path
        //   → buffer: [350, 200], buffered_rows=550
        //
        // Step 3: push_batch([800])
        //   → batch_size=800 > 400, large batch path
        //   → buffered_rows=550 > 400 → Case 2: flush first
        //   → flush: output [550] (combined [350, 200])
        //   → then bypass: output [800]
        //   Result: [550], [800] - buffer flushed to prevent oversized merge
        //
        // Example 2: Multiple small batches accumulate before large batch
        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
        // Input: [150, 100, 80, 900]
        //
        // Step 1-3: Accumulate small batches
        //   150 → buffer: [150], buffered_rows=150
        //   100 → buffer: [150, 100], buffered_rows=250
        //   80  → buffer: [150, 100, 80], buffered_rows=330
        //
        // Step 4: push_batch([900])
        //   → batch_size=900 > 300, large batch path
        //   → buffered_rows=330 > 300 → Case 2: flush first
        //   → flush: output [330] (combined [150, 100, 80])
        //   → then bypass: output [900]
        //   Result: [330], [900] - prevents merge into [1230] which would be too large

        // =============================================================================
        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
        // =============================================================================
        // When buffer is small enough, we still merge to maintain efficiency
        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
        // Input: [300, 1200]
        //
        // Step 1: push_batch([300])
        //   → batch_size=300 <= 500, normal path
        //   → buffer: [300], buffered_rows=300
        //
        // Step 2: push_batch([1200])
        //   → batch_size=1200 > 500, large batch path
        //   → buffered_rows=300 <= 500 → Case 3: normal merge
        //   → buffer: [300, 1200] (1500 total)
        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
        //   Result: [1000], [500] - normal split/merge behavior maintained

        // =============================================================================
        // Comparison: Default vs Optimized Behavior
        // =============================================================================
        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
        // Input: [600, 1200, 300]
        //
        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
        //   600 → buffer: [600]
        //   1200 → buffer: [600, 1200] (1800 rows total)
        //         → split: output [1000 rows], buffer [800 rows remaining]
        //   300 → buffer: [800, 300] (1100 rows total)
        //        → split: output [1000 rows], buffer [100 rows remaining]
        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
        //
        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
        //   600 → Case 1: direct bypass → output: [600]
        //   1200 → Case 1: direct bypass → output: [1200]
        //   300 → normal path → buffer: [300]
        //   Result: [600], [1200], [300] - large batches preserved

        // =============================================================================
        // Benefits and Trade-offs
        // =============================================================================
        // Benefits of the optimization:
        // - Large batches stay intact (better for downstream vectorized processing)
        // - Fewer split/merge operations (better CPU performance)
        // - More predictable memory usage patterns
        // - Maintains streaming efficiency while preserving batch boundaries
        //
        // Trade-offs:
        // - Output batch sizes become variable (not always target_batch_size)
        // - May produce smaller partial batches when flushing before large batches
        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance

        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
        // small batches together?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).

Also added todo for this point, i can create a follow-up ticket after this PR, thanks!

I think this is a wise idea. I suspect there many things in DataFusion that implicitly assume that rows are not reordered so we will have to test such a change carefully

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zhuqi-lucas -- this looks great and very well tested

I have a few comments and I think @Dandandan 's is worth resolving as well, but otherwise I think this PR is ready to go

/// assert_eq!(completed_batch, expected_batch);
/// ```
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
if let Some(limit) = self.biggest_coalesce_batch_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything)

}

#[test]
fn test_biggest_coalesce_batch_size_constructor_method() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it in latest PR, i also think it's not needed.

}

#[test]
fn test_biggest_coalesce_batch_size_getter_setter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is strictly necessary to test the getter/setter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is strictly necessary to test the getter/setter

Thank you @alamb , i agree it's not needed, removed in latest PR.

@zhuqi-lucas
Copy link
Contributor Author

Thank you @zhuqi-lucas -- this looks great and very well tested

I have a few comments and I think @Dandandan 's is worth resolving as well, but otherwise I think this PR is ready to go

Thank you @alamb for review, addressed comments in latest PR, i also will wait @Dandandan feedback, thanks!

@zhuqi-lucas
Copy link
Contributor Author

Thank you @Dandandan @alamb , updated in latest PR:

  1. Addressed feat: support push batch direct to completed and add biggest coalesce batch support #8146 (comment) in latest PR
  2. Also added rich unit tests confirm the result.

Added todo, may be follow-up ticket about the no sort optimization, it will be more complex, but useful.

@zhuqi-lucas zhuqi-lucas changed the title feat: support push batch direct to completed as a public API and add biggest coalesce batch support feat: support push batch direct to completed and add biggest coalesce batch support Aug 17, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks (again) @zhuqi-lucas and @Dandandan

I pushed a few OCD comment updates to this PR, but I don't think they were strictly necessary. I just had the code checked out and couldn't help myself.

This is really nice and a good example of careful optimization work 👨‍🍳 👌

/// assert_eq!(completed_batch, expected_batch);
/// ```
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
if let Some(limit) = self.biggest_coalesce_batch_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, thinking about it, we can as well prevent flushing in progress array if we do not require the output to be sorted (so smaller in progress batches will still be combined into a output batch of the target size).

Also added todo for this point, i can create a follow-up ticket after this PR, thanks!

I think this is a wise idea. I suspect there many things in DataFusion that implicitly assume that rows are not reordered so we will have to test such a change carefully

@zhuqi-lucas
Copy link
Contributor Author

Thanks (again) @zhuqi-lucas and @Dandandan

I pushed a few OCD comment updates to this PR, but I don't think they were strictly necessary. I just had the code checked out and couldn't help myself.

This is really nice and a good example of careful optimization work 👨‍🍳 👌

Great, thank you @alamb !

@alamb alamb merged commit e0f9382 into apache:main Aug 19, 2025
25 of 26 checks passed
@alamb
Copy link
Contributor

alamb commented Aug 19, 2025

Thanks again @zhuqi-lucas

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants