Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f93d36e
Add support for file row numbers in Parquet readers
jkylling Mar 16, 2025
e485c0b
Add Apache license header to row_number.rs
jkylling Mar 26, 2025
2a62009
Run cargo format
jkylling Mar 26, 2025
fb5126f
Change with_row_number_column to take impl Into<String>
jkylling Mar 27, 2025
5350728
Change Option<String> -> Option<&str> in build_array_reader
jkylling Mar 27, 2025
188f350
Replace ParquetError::RowGroupMetaDataMissingRowNumber with General
jkylling Mar 27, 2025
37a9d83
Split test_create_array_reader test into two
jkylling Mar 27, 2025
41e38fe
first_row_number -> first_row_index
jkylling Mar 28, 2025
1a1e6b6
Simplify RowNumberReader with iterators
jkylling May 11, 2025
bcad87f
Merge remote-tracking branch 'origin/main' into feature/parquet-reade…
vustef Oct 23, 2025
89c1fd1
add parquet-testing change from the merge
vustef Oct 23, 2025
b0d53d0
Fix test_arrow_reader_all_columns
vustef Oct 23, 2025
094ae81
Fix first_row_number
vustef Oct 23, 2025
a5858df
Rename to first_row_index consistently, remove Option.
vustef Oct 23, 2025
5e7d9a1
revert parquet-testing update
vustef Oct 23, 2025
54c22c6
Fix baselines in file::metadata::tests::test_memory_size
vustef Oct 23, 2025
f05d470
Fix encryption metadata and async tests. Those features and default f…
vustef Oct 23, 2025
11e4f39
RowNumber extension type
vustef Oct 24, 2025
d02c977
using supplied_schema works
vustef Oct 24, 2025
6fecc17
Don't modify parsing of parquet schema, virtual columns can only be a…
vustef Oct 24, 2025
1414421
Reworked with_virtual_columns in options
vustef Oct 27, 2025
07eb467
switch to ref to slice; cleanup with_row_number_columns; async tests …
vustef Oct 27, 2025
af0e0f9
Bring back optionality to first_row_index, for future consideration w…
vustef Oct 27, 2025
8bccd22
Reexport
vustef Oct 27, 2025
65679ba
reexport all within virtual_type
vustef Oct 27, 2025
968d461
pub mod virtual_type skipping experimental schema
vustef Oct 27, 2025
6144967
Switch back to `virtual_type::*` for now; fix warnings on cargo test
vustef Oct 27, 2025
3af3ad7
Fix `projected_fields` assertion in async reader
vustef Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ impl RowGroups for InMemoryRowGroup {
}
}
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(&self.metadata))
}
}

impl InMemoryRowGroup {
Expand All @@ -151,6 +155,7 @@ impl InMemoryRowGroup {
&self.metadata.schema_descr_ptr(),
self.mask.clone(),
None,
&[],
)?;

ParquetRecordBatchReader::try_new_with_row_groups(&levels, self, batch_size, selection)
Expand Down
59 changes: 53 additions & 6 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::{Arc, Mutex};

use arrow_schema::{DataType, Fields, SchemaBuilder};
use std::sync::{Arc, Mutex};

use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
Expand All @@ -26,13 +25,15 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
use crate::arrow::array_reader::row_number::RowNumberReader;
use crate::arrow::array_reader::{
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
make_byte_array_reader,
};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
use crate::arrow::schema::virtual_type::RowNumber;
use crate::basic::Type as PhysicalType;
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -113,7 +114,7 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: Option<&ParquetField>,
mask: &ProjectionMask,
) -> Result<Box<dyn ArrayReader>> {
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| self.build_reader(field, mask).transpose())
.transpose()?
Expand All @@ -131,7 +132,7 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { col_idx, .. } => {
let Some(reader) = self.build_primitive_reader(field, mask)? else {
Expand All @@ -153,6 +154,13 @@ impl<'a> ArrayReaderBuilder<'a> {
Ok(Some(reader))
}
}
ParquetFieldType::Virtual(virtual_type) => {
// Virtual columns don't have data in the parquet file
// They need to be built by specialized readers
match virtual_type {
VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
}
}
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => self.build_map_reader(field, mask),
DataType::Struct(_) => self.build_struct_reader(field, mask),
Expand All @@ -164,6 +172,10 @@ impl<'a> ArrayReaderBuilder<'a> {
}
}

fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
Ok(Box::new(RowNumberReader::try_new(self.row_groups.row_groups())?))
}

/// Build array reader for map type.
fn build_map_reader(
&self,
Expand Down Expand Up @@ -401,7 +413,7 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
) -> Result<Option<Box<dyn ArrayReader>>> {
let arrow_fields = match &field.arrow_type {
DataType::Struct(children) => children,
_ => unreachable!(),
Expand Down Expand Up @@ -455,6 +467,7 @@ mod tests {
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
&[],
)
.unwrap();

Expand All @@ -472,4 +485,38 @@ mod tests {

assert_eq!(array_reader.get_data_type(), &arrow_type);
}

#[test]
fn test_create_array_reader_with_row_numbers() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let row_number_field = Field::new("row_number", DataType::Int64, false).with_extension_type(RowNumber);
let (_, fields) = parquet_to_arrow_schema_and_fields(
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
std::slice::from_ref(&row_number_field),
)
.unwrap();

let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();

// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![
Field::new(
"b_struct",
DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
true,
),
row_number_field,
]));

assert_eq!(array_reader.get_data_type(), &arrow_type);
}
}
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ mod tests {
schema,
ProjectionMask::all(),
file_metadata.key_value_metadata(),
&[],
)
.unwrap();

Expand Down
10 changes: 9 additions & 1 deletion parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ mod map_array;
mod null_array;
mod primitive_array;
mod row_group_cache;
mod row_number;
mod struct_array;

#[cfg(test)]
mod test_util;

// Note that this crate is public under the `experimental` feature flag.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't remove this comment

use crate::file::metadata::RowGroupMetaData;
pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
Expand Down Expand Up @@ -139,6 +140,9 @@ pub trait RowGroups {
/// Returns a [`PageIterator`] for all pages in the specified column chunk
/// across all row groups in this collection.
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;

/// Returns an iterator over the row groups in this collection
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
}

impl RowGroups for Arc<dyn FileReader> {
Expand All @@ -150,6 +154,10 @@ impl RowGroups for Arc<dyn FileReader> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(self.metadata().row_groups().iter())
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
Expand Down
84 changes: 84 additions & 0 deletions parquet/src/arrow/array_reader/row_number.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::ArrayReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaData;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use std::any::Any;
use std::sync::Arc;

pub(crate) struct RowNumberReader {
buffered_row_numbers: Vec<i64>,
remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
}

impl RowNumberReader {
pub(crate) fn try_new<'a>(
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
) -> Result<Self> {
let ranges = row_groups
.map(|rg| {
let first_row_index = rg.first_row_index().ok_or(ParquetError::General(
"Row group missing row number".to_string(),
))?;
Ok(first_row_index..first_row_index + rg.num_rows())
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
buffered_row_numbers: Vec::new(),
remaining_row_numbers: ranges.into_iter().flatten(),
})
}
}

impl ArrayReader for RowNumberReader {
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let starting_len = self.buffered_row_numbers.len();
self.buffered_row_numbers
.extend((&mut self.remaining_row_numbers).take(batch_size));
Ok(self.buffered_row_numbers.len() - starting_len)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
// TODO: Use advance_by when it stabilizes to improve performance
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO from original PR

Ok((&mut self.remaining_row_numbers).take(num_records).count())
}

fn as_any(&self) -> &dyn Any {
self
}

fn get_data_type(&self) -> &DataType {
&DataType::Int64
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from_iter(
self.buffered_row_numbers.drain(..),
)))
}

fn get_def_levels(&self) -> Option<&[i16]> {
None
}

fn get_rep_levels(&self) -> Option<&[i16]> {
None
}
}
Loading