Skip to content

Commit 471f3b1

Browse files
authored
[Minor] Backport changes to metadata benchmark (#8251)
# Which issue does this PR close? - Part of #5854. # Rationale for this change Backport changes to allow apples-to-apples comparison of thrift decoding # What changes are included in this PR? Adds a page header benchmark and updates bench names to match those in feature branch. # Are these changes tested? No tests needed...only changes to benchmark # Are there any user-facing changes? No
1 parent 9709c09 commit 471f3b1

File tree

2 files changed

+92
-7
lines changed

2 files changed

+92
-7
lines changed

parquet/benches/metadata.rs

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use parquet::file::metadata::ParquetMetaDataReader;
1819
use rand::Rng;
1920
use thrift::protocol::TCompactOutputProtocol;
2021

@@ -25,7 +26,7 @@ use parquet::file::reader::SerializedFileReader;
2526
use parquet::file::serialized_reader::ReadOptionsBuilder;
2627
use parquet::format::{
2728
ColumnChunk, ColumnMetaData, CompressionCodec, Encoding, FieldRepetitionType, FileMetaData,
28-
RowGroup, SchemaElement, Type,
29+
PageEncodingStats, PageType, RowGroup, SchemaElement, Type,
2930
};
3031
use parquet::thrift::TSerializable;
3132

@@ -93,7 +94,18 @@ fn encoded_meta() -> Vec<u8> {
9394
index_page_offset: Some(rng.random()),
9495
dictionary_page_offset: Some(rng.random()),
9596
statistics: Some(stats.clone()),
96-
encoding_stats: None,
97+
encoding_stats: Some(vec![
98+
PageEncodingStats {
99+
page_type: PageType::DICTIONARY_PAGE,
100+
encoding: Encoding::PLAIN,
101+
count: 1,
102+
},
103+
PageEncodingStats {
104+
page_type: PageType::DATA_PAGE,
105+
encoding: Encoding::RLE_DICTIONARY,
106+
count: 10,
107+
},
108+
]),
97109
bloom_filter_offset: None,
98110
bloom_filter_length: None,
99111
size_statistics: None,
@@ -151,6 +163,36 @@ fn get_footer_bytes(data: Bytes) -> Bytes {
151163
data.slice(meta_start..meta_end)
152164
}
153165

166+
#[cfg(feature = "arrow")]
167+
fn rewrite_file(bytes: Bytes) -> (Bytes, FileMetaData) {
168+
use arrow::array::RecordBatchReader;
169+
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter};
170+
use parquet::file::properties::{EnabledStatistics, WriterProperties};
171+
172+
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
173+
.expect("parquet open")
174+
.build()
175+
.expect("parquet open");
176+
let writer_properties = WriterProperties::builder()
177+
.set_statistics_enabled(EnabledStatistics::Page)
178+
.set_write_page_header_statistics(true)
179+
.build();
180+
let mut output = Vec::new();
181+
let mut parquet_writer = ArrowWriter::try_new(
182+
&mut output,
183+
parquet_reader.schema(),
184+
Some(writer_properties),
185+
)
186+
.expect("create arrow writer");
187+
188+
for maybe_batch in parquet_reader {
189+
let batch = maybe_batch.expect("reading batch");
190+
parquet_writer.write(&batch).expect("writing data");
191+
}
192+
let file_meta = parquet_writer.close().expect("finalizing file");
193+
(output.into(), file_meta)
194+
}
195+
154196
fn criterion_benchmark(c: &mut Criterion) {
155197
// Read file into memory to isolate filesystem performance
156198
let file = "../parquet-testing/data/alltypes_tiny_pages.parquet";
@@ -168,19 +210,54 @@ fn criterion_benchmark(c: &mut Criterion) {
168210
})
169211
});
170212

171-
let meta_data = get_footer_bytes(data);
172-
c.bench_function("decode file metadata", |b| {
213+
let meta_data = get_footer_bytes(data.clone());
214+
c.bench_function("decode parquet metadata", |b| {
215+
b.iter(|| {
216+
ParquetMetaDataReader::decode_metadata(&meta_data).unwrap();
217+
})
218+
});
219+
220+
c.bench_function("decode thrift file metadata", |b| {
173221
b.iter(|| {
174222
parquet::thrift::bench_file_metadata(&meta_data);
175223
})
176224
});
177225

178-
let buf = black_box(encoded_meta()).into();
179-
c.bench_function("decode file metadata (wide)", |b| {
226+
let buf: Bytes = black_box(encoded_meta()).into();
227+
c.bench_function("decode parquet metadata (wide)", |b| {
228+
b.iter(|| {
229+
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
230+
})
231+
});
232+
233+
c.bench_function("decode thrift file metadata (wide)", |b| {
180234
b.iter(|| {
181235
parquet::thrift::bench_file_metadata(&buf);
182236
})
183237
});
238+
239+
// rewrite file with page statistics. then read page headers.
240+
#[cfg(feature = "arrow")]
241+
let (file_bytes, metadata) = rewrite_file(data.clone());
242+
#[cfg(feature = "arrow")]
243+
c.bench_function("page headers", |b| {
244+
b.iter(|| {
245+
metadata.row_groups.iter().for_each(|rg| {
246+
rg.columns.iter().for_each(|col| {
247+
if let Some(col_meta) = &col.meta_data {
248+
if let Some(dict_offset) = col_meta.dictionary_page_offset {
249+
parquet::thrift::bench_page_header(
250+
&file_bytes.slice(dict_offset as usize..),
251+
);
252+
}
253+
parquet::thrift::bench_page_header(
254+
&file_bytes.slice(col_meta.data_page_offset as usize..),
255+
);
256+
}
257+
});
258+
});
259+
})
260+
});
184261
}
185262

186263
criterion_group!(benches, criterion_benchmark);

parquet/src/thrift.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,20 @@ pub trait TSerializable: Sized {
3333
fn write_to_out_protocol<T: TOutputProtocol>(&self, o_prot: &mut T) -> thrift::Result<()>;
3434
}
3535

36-
/// Public function to aid benchmarking.
36+
// Public function to aid benchmarking. Reads Parquet `FileMetaData` encoded in `bytes`.
37+
#[doc(hidden)]
3738
pub fn bench_file_metadata(bytes: &bytes::Bytes) {
3839
let mut input = TCompactSliceInputProtocol::new(bytes);
3940
crate::format::FileMetaData::read_from_in_protocol(&mut input).unwrap();
4041
}
4142

43+
// Public function to aid benchmarking. Reads Parquet `PageHeader` encoded in `bytes`.
44+
#[doc(hidden)]
45+
pub fn bench_page_header(bytes: &bytes::Bytes) {
46+
let mut prot = TCompactSliceInputProtocol::new(bytes);
47+
crate::format::PageHeader::read_from_in_protocol(&mut prot).unwrap();
48+
}
49+
4250
/// A more performant implementation of [`TCompactInputProtocol`] that reads a slice
4351
///
4452
/// [`TCompactInputProtocol`]: thrift::protocol::TCompactInputProtocol

0 commit comments

Comments
 (0)