Skip to content

Commit a47d996

Browse files
[Parquet] Reuse buffer in ByteViewArrayDecoderPlain (#6930)
* reuse buffer in view array * Update parquet/src/arrow/array_reader/byte_view_array.rs Co-authored-by: Raphael Taylor-Davies <[email protected]> * use From<Bytes> instead --------- Co-authored-by: Raphael Taylor-Davies <[email protected]>
1 parent 74499c0 commit a47d996

File tree

1 file changed

+32
-6
lines changed

1 file changed

+32
-6
lines changed

parquet/src/arrow/array_reader/byte_view_array.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl ByteViewArrayDecoder {
290290

291291
/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
292292
pub struct ByteViewArrayDecoderPlain {
293-
buf: Bytes,
293+
buf: Buffer,
294294
offset: usize,
295295

296296
validate_utf8: bool,
@@ -308,17 +308,23 @@ impl ByteViewArrayDecoderPlain {
308308
validate_utf8: bool,
309309
) -> Self {
310310
Self {
311-
buf,
311+
buf: Buffer::from(buf),
312312
offset: 0,
313313
max_remaining_values: num_values.unwrap_or(num_levels),
314314
validate_utf8,
315315
}
316316
}
317317

318318
pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
319-
// Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer`
320-
let buf = arrow_buffer::Buffer::from(self.buf.clone());
321-
let block_id = output.append_block(buf);
319+
// avoid creating a new buffer if the last buffer is the same as the current buffer
320+
// This is especially useful when row-level filtering is applied, where we call lots of small `read` over the same buffer.
321+
let block_id = {
322+
if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) {
323+
output.buffers.len() as u32 - 1
324+
} else {
325+
output.append_block(self.buf.clone())
326+
}
327+
};
322328

323329
let to_read = len.min(self.max_remaining_values);
324330

@@ -690,12 +696,13 @@ mod tests {
690696

691697
use crate::{
692698
arrow::{
693-
array_reader::test_util::{byte_array_all_encodings, utf8_column},
699+
array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
694700
buffer::view_buffer::ViewBuffer,
695701
record_reader::buffer::ValuesBuffer,
696702
},
697703
basic::Encoding,
698704
column::reader::decoder::ColumnValueDecoder,
705+
data_type::ByteArray,
699706
};
700707

701708
use super::*;
@@ -746,4 +753,23 @@ mod tests {
746753
);
747754
}
748755
}
756+
757+
#[test]
758+
fn test_byte_view_array_plain_decoder_reuse_buffer() {
759+
let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
760+
let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
761+
let pages = encode_byte_array(Encoding::PLAIN, &byte_array);
762+
763+
let column_desc = utf8_column();
764+
let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
765+
766+
let mut view_buffer = ViewBuffer::default();
767+
decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
768+
decoder.read(&mut view_buffer, 1).unwrap();
769+
decoder.read(&mut view_buffer, 1).unwrap();
770+
assert_eq!(view_buffer.buffers.len(), 1);
771+
772+
decoder.read(&mut view_buffer, 1).unwrap();
773+
assert_eq!(view_buffer.buffers.len(), 1);
774+
}
749775
}

0 commit comments

Comments
 (0)