Skip to content

Commit a19fc62

Browse files
authored
Add BatchCoalescer::push_filtered_batch and docs (#7652)
# Which issue does this PR close? We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. - Part of #7650 # Rationale for this change In order to coalesce the result of applying a filter currently requires first copying the results into an intermediate array (calling `filter`). My plan is to remove this extra copy by building the final array up directly incrementally To do to so, there needs to be an API that can take the original data and the filter # What changes are included in this PR? 1. Add `BatchCoalescer::push_filtered_batch` and docs 2. Update benchmarks to use it # Are there any user-facing changes? New API
1 parent e1ade7b commit a19fc62

File tree

2 files changed

+59
-7
lines changed

2 files changed

+59
-7
lines changed

arrow-select/src/coalesce.rs

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
//! [`filter`]: crate::filter::filter
2222
//! [`take`]: crate::take::take
2323
use crate::concat::concat_batches;
24-
use arrow_array::StringViewArray;
24+
use crate::filter::filter_record_batch;
2525
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
26+
use arrow_array::{BooleanArray, StringViewArray};
2627
use arrow_data::ByteView;
2728
use arrow_schema::{ArrowError, SchemaRef};
2829
use std::collections::VecDeque;
2930
use std::sync::Arc;
30-
3131
// Originally From DataFusion's coalesce module:
3232
// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
3333

@@ -155,9 +155,62 @@ impl BatchCoalescer {
155155
Arc::clone(&self.schema)
156156
}
157157

158-
/// Push next batch into the Coalescer
158+
/// Push a batch into the Coalescer after applying a filter
159+
///
160+
/// This is semantically equivalent of calling [`Self::push_batch`]
161+
/// with the results from [`filter_record_batch`]
162+
///
163+
/// # Example
164+
/// # Example
165+
/// ```
166+
/// # use arrow_array::{record_batch, BooleanArray};
167+
/// # use arrow_select::coalesce::BatchCoalescer;
168+
/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
169+
/// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
170+
/// // Apply a filter to each batch to pick the first and last row
171+
/// let filter = BooleanArray::from(vec![true, false, true]);
172+
/// // create a new Coalescer that targets creating 1000 row batches
173+
/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
174+
/// coalescer.push_batch_with_filter(batch1, &filter);
175+
/// coalescer.push_batch_with_filter(batch2, &filter);
176+
/// // finsh and retrieve the created batch
177+
/// coalescer.finish_buffered_batch().unwrap();
178+
/// let completed_batch = coalescer.next_completed_batch().unwrap();
179+
/// // filtered out 2 and 5:
180+
/// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
181+
/// assert_eq!(completed_batch, expected_batch);
182+
/// ```
183+
pub fn push_batch_with_filter(
184+
&mut self,
185+
batch: RecordBatch,
186+
filter: &BooleanArray,
187+
) -> Result<(), ArrowError> {
188+
// TODO: optimize this to avoid materializing (copying the results
189+
// of filter to a new batch)
190+
let filtered_batch = filter_record_batch(&batch, filter)?;
191+
self.push_batch(filtered_batch)
192+
}
193+
194+
/// Push all the rows from `batch` into the Coalescer
159195
///
160196
/// See [`Self::next_completed_batch()`] to retrieve any completed batches.
197+
///
198+
/// # Example
199+
/// ```
200+
/// # use arrow_array::record_batch;
201+
/// # use arrow_select::coalesce::BatchCoalescer;
202+
/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
203+
/// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
204+
/// // create a new Coalescer that targets creating 1000 row batches
205+
/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
206+
/// coalescer.push_batch(batch1);
207+
/// coalescer.push_batch(batch2);
208+
/// // finsh and retrieve the created batch
209+
/// coalescer.finish_buffered_batch().unwrap();
210+
/// let completed_batch = coalescer.next_completed_batch().unwrap();
211+
/// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
212+
/// assert_eq!(completed_batch, expected_batch);
213+
/// ```
161214
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
162215
if batch.num_rows() == 0 {
163216
// If the batch is empty, we don't need to do anything

arrow/benches/coalesce_kernels.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,9 @@ fn filter_streams(
214214
while num_output_batches > 0 {
215215
let filter = filter_stream.next_filter();
216216
let batch = data_stream.next_batch();
217-
// Apply the filter to the input batch
218-
let filtered_batch = arrow_select::filter::filter_record_batch(batch, filter).unwrap();
219-
// Add the filtered batch to the coalescer
220-
coalescer.push_batch(filtered_batch).unwrap();
217+
coalescer
218+
.push_batch_with_filter(batch.clone(), filter)
219+
.unwrap();
221220
// consume (but discard) the output batch
222221
if coalescer.next_completed_batch().is_some() {
223222
num_output_batches -= 1;

0 commit comments

Comments
 (0)