-
Notifications
You must be signed in to change notification settings - Fork 1k
Add support for file row numbers in Parquet readers #7307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add support for file row numbers in Parquet readers #7307
Conversation
|
Thanks for you submission @jkylling, I'll try to get a first pass review done this week. In the meantime please add the Apache license to row_number.rs and correct the other lint errors. 🙏 |
Updated. Looking forward to the first review! I was very confused as to why cargo format did not work properly, but looks like you are already aware of this (#6179) :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, just a few nits for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @jkylling for taking this on. I've finished my first pass and have only one reservation. Otherwise it looks good and meets the criteria set forth in #7299 (comment).
| row_groups: VecDeque::from( | ||
| row_groups | ||
| .into_iter() | ||
| .map(TryInto::try_into) | ||
| .collect::<Result<Vec<_>>>()?, | ||
| ), | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding myself a bit uneasy with adding the first row number to the RowGroupMetaData. Rather than that, could this bit here instead be changed to keep track of the first row number while populating the deque? Is there some wrinkle I'm missing? Might the row groups be filtered before instantiating the RowNumberReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered my own question...it seems there's some complexity here at least when using the async reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe we don't have access to all row groups when creating the array readers.
I took a quick look at the corresponding Parquet reader implementations for Trino and parquet-java.
Trino:
- Has a boolean to include a row number column, https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L112
- Includes this column when the boolean is set: https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L337
- Has a special block reader for reading row indexes https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L385-L393 I believe the positions play a similar role to our
RowSelectors. - Gets row indexes from
RowGroupInfo, a pruned version of https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L456 - Populates the
fileRowOffsetby iterating through the row groups: https://github.com/trinodb/trino/blob/master/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java#L107-L111
parquet-java:
- Has a method for tracking the current row index: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L150-L155
- This row index is based on an iterator which starts form a row group row index, https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java#L311-L339
- This row group row index is initialized by iterating through the row groups: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1654-L1656 (mapping obtained here: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1496-L1506)
Their approaches are rather similar to ours.
One take away is that the above implementations do not be keep the full RowGroupMetaDatas around as we do by requiring an iterator over RowGroupMetadata in the RowGroups trait. This is likely a good idea as this struct can be quite large. What do you think about changing the RowGroups trait to something like below?
/// A collection of row groups
pub trait RowGroups {
/// Get the number of rows in this collection
fn num_rows(&self) -> usize {
self.row_group_infos.iter().map(|info| info.num_rows).sum()
}
/// Returns a [`PageIterator`] for the column chunks with the given leaf column index
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
/// Returns an iterator over the row groups in this collection
fn row_group_infos(&self) -> Box<dyn Iterator<Item = &RowGroupInfo> + '_>;
}
struct RowGroupInfo {
num_rows: usize,
row_index: i64,
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary...the full ParquetMetaData is already available everywhere this trait is implemented, so I don't see a need to worry about adding another metadata structure here.
parquet/src/file/metadata/mod.rs
Outdated
| self.num_rows | ||
| } | ||
|
|
||
| /// Returns the first row number in this row group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Returns the first row number in this row group. | |
| /// Returns the global index number for the first row in this row group. |
And perhaps use first_row_index instead? That may be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Updated.
Sorry @jkylling, things have been rather hectic lately. I'll try to give it another look this week, along with some benchmarking (but I don't expect any perf hit). I'll just note that since this is a breaking change, it won't be able to be merged until the next major release (July-ish IIRC), so there's plenty of time to get this right. Also, I'll be deferring to those with more project history (e.g. @alamb @tustvold) as to whether the approach here is the best way to achieve the goal. Thank you for your contribution and your patience! 😄 |
|
Yeah, sorry I also have been slammed with many other projects. I'll try and find time to look but I suspect it may be a while |
|
Thank you for the update, and totally understand other responsibilities are taking up your time. I'll keep on being patient, and maybe do some minor improvements to this PR (use a smaller struct than the full RowGroupMetadata, and add some benchmarks for the RowNumberReader). Just want to make sure we have this PR ready before the next major release approaches. |
|
Yes, a benchmark that shows minimal impact with no row numbers would be nice (and hopefully adding row numbers won't be bad either 😄). |
parquet/src/file/metadata/reader.rs
Outdated
|
|
||
| let mut first_row_number = 0; | ||
| let mut row_groups = Vec::new(); | ||
| t_file_metadata.row_groups.sort_by_key(|rg| rg.ordinal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these sorts necessary? Would the ordinal ever be out of order? They shouldn't be if I understand the encryption spec correctly.
100% agreed that simplicity and maintainability are paramount... but row numbers are a pretty fundamental feature that's very hard to emulate in higher layers if the parquet reader doesn't support them. Back when https://github.com/delta-io/delta first took a dependency on row numbers, spark's parquet reader did not yet support them; we had to disable row group pruning and other optimizations in order to make it (mostly) safe to manually compute row numbers in the query engine. It was really painful. AFAIK, most parquet readers now support row numbers. We can add DuckDB and Iceberg to the ones already mentioned above. I was actually surprised to trip over this PR and learn that arrow-parquet does not yet support row numbers. |
| field: &ParquetField, | ||
| mask: &ProjectionMask, | ||
| row_groups: &dyn RowGroups, | ||
| row_number_column: Option<&str>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a crazy idea, but wouldn't the implementation be simpler (and more flexible) with a RowNumber extension type? Then users could do e.g.
Field::new("row_index", DataType::Int64, false).with_extension_type(RowNumber))and build_primitive_reader could just check for it, no matter where in the schema it hides, instead of implicitly adding an extra column to the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I don't think raw parquet types support metadata, so this may not be an option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would simplify usage of the feature. Having to keep track of the additional row number column is quite cumbersome in clients of this API. One option could be to extend ParquetFieldType with an additional row number type and add it based on the extension type in ArrowReaderMetadata::with_supplied_metadata? @etseidl @alamb what do you think about this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do other parquet readers do to represent row numbers in their output schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do other parquet readers do to represent row numbers in their output schema?
#7307 (comment), posted Apr 15, might be a starting point?
AFAIK, most parquet readers now support row numbers. We can add DuckDB and Iceberg to the ones already mentioned above.
Duckdb uses a column schema type approach. Interestingly, that's new -- last time I looked (nearly a year go) it required the reader to pass options along with the schema, and one of the options was to request row numbers (which then became an extra unnamed column at the end of the regular schema). I think that approach didn't scale as they started needing more and more special column types. I see geometry, variant, and non-materialiaed expressions, for example.
Iceberg's parquet reader works almost exclusively from field ids, and row index has a baked in field id from the range of metadata row ids.
Spark uses a metadata column approach, identified by a special name (_metadata._rowid); I don't remember how precisely that maps to the underlying parquet reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 maybe we could add an arrow extension type, similarly to what we are doing with Variant and Geometry -- so someone would request a column "foo: int64" with an arrow extension type of "row_number" or something 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could potentially work, but the problem is a row number column is only meaningful in a read request schema. Once row numbers hit the output, they're just normal int64 values from then on. Things get a lot harder to reason about if the extension type persists. For example, in a join of multiple tables, where each scan is producing row numbers for its respective files, one could easily end up with two row number columns in the join's output. And the parquet writer would definitely need to block writing such columns, or at least strip away the metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance we could also return this column with a field ID specified by the user? For Iceberg, we would like the field ID to be according to the spec, and ideally the record batches come already with such schema, rather than having to post-process them.
| struct RowGroupSizeIterator { | ||
| row_groups: VecDeque<RowGroupSize>, | ||
| } | ||
|
|
||
| impl RowGroupSizeIterator { | ||
| fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self> | ||
| where | ||
| I: TryInto<RowGroupSize, Error = ParquetError>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this whole RowGroupSizeIterator thing is a complicated and error-prone way of chaining several Range<i64>? Can we use standard iterator machinery instead?
pub(crate) struct RowNumberReader {
buffered_row_numbers: Vec<i64>,
remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
}
impl RowNumberReader {
pub(crate) fn try_new<'a>(
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
) -> Result<Self> {
let ranges = row_groups
.map(|rg| {
let first_row_number = rg.first_row_index().ok_or(ParquetError::General(
"Row group missing row number".to_string(),
))?;
Ok(first_row_number..first_row_number + rg.num_rows())
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
buffered_row_numbers: Vec::new(),
remaining_row_numbers: ranges.into_iter().flatten(),
})
}
// Use `take` on a `&mut Iterator` to consume a number of elements without consuming the iterator.
fn take(&mut self, batch_size: usize) -> impl Iterator<Item = i64> {
(&mut self.remaining_row_numbers).take(batch_size)
}
}
impl ArrayReader for RowNumberReader {
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let starting_len = self.buffered_row_numbers.len();
self.buffered_row_numbers.extend(self.take(batch_size));
Ok(self.buffered_row_numbers.len() - starting_len)
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
Ok(self.take(num_records).count())
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much simpler. Thank you! I suspect we are missing out on some performance in skip_records with this, but the bulk of the data pruning will likely have happened by pruning Parquet row groups already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich I see you are involved in the maintenance of delta-kernel-rs. If you are interested, I've started on an implementation of deletion vector read support in delta-rs in this branch, based on a back port of an early version of this PR to arrow-54.2.1. The PR is still very rough, but the read path has got okay test coverage and it's able to read tables with deletion vectors produced by Spark correctly. The write support for deletion vectors is rudimentary (deletion vectors are only used for deletes when configured, and deleting from the same file twice is unsupported), and is mostly there to be able to unit test the read support. Unfortunately, I've not had time to wokr on this lately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW @zhuqi-lucas and I are working on improvements to the filter application here, which may result in some additional API churn:
Co-authored-by: scovich <[email protected]>
+1 on this painpoint, working around this lack of capability from a client perspective is very challenging and comes with a bunch of correctness risks (e.g. we can write some rowIndex column client-side, but then we have to be 100% sure that the emitted rowIndex will perfectly match the Parquet files, which can get quite tricky especially in multi-threaded executions etc.) Would love to this feature landing in Arrow-rs + Datafusion |
Thanks @16pierre -- I agree there is no good workaround for adding row numbers to the output of the parquet reader I think the biggest thing we need to do is to sort out the API for "how does a user request the (virtual) row number column" as todays @scovich and @etseidl 's idea to use some sort of Arrow metadata is interesting, but I am not quite sure how it would look |
The "standard" way in most engines I've seen would (in arrow-rs) include an extension type, that the parquet reader recognizes, in the parquet reader's read schema. Nice, because other readers could choose to honor the same extension type and produce row indexes as well. But it does open the question of whether the parquet reader's output should strip away the metadata -- since arguably the row indexes are just normal data once they've been produced -- and if not, how to prevent e.g. writing the values of a "row index" field back to parquet. Is that a bearable approach? Of should we keep thinking of other ways? |
## What changes are proposed in this pull request? This PR follows up on #1266 and adds support for reading the row index metadata column to the default engine. The implementation directly follows the approach proposed in #920 and slightly modifies it to match the new metadata column API. Quoting from #920 > Deletion vectors (and row tracking, eventually) rely on accurate file-level row indexes. But they're not implemented in the kernel's default parquet reader. That means we must rely on the position of rows in data batches returned by each read, and we cannot apply optimizations such as stats-based row group skipping (see #860). > > Add row index support to the default Parquet reader, in the form of a new RowIndex variant of ReorderIndexTransform. [...] The default parquet reader recognizes (the RowIndex metadata) column and injects a transform to generate row indexes (with appropriate adjustments for any row group skipping that might occur). > > Fixes #919 > > NOTE: If/when arrow-rs parquet reader gains native support for row indexes, e.g. apache/arrow-rs#7307, we should switch to using that. Our solution here is not robust to advanced parquet reader features like page-level skipping. row-level predicate pushdown, etc. ### This PR affects the following public APIs None - the breaking changes were introduced in #1266. ## How was this change tested? New UT. Co-authored-by: Zach Schuermann <[email protected]>
parquet/src/file/metadata/mod.rs
Outdated
| pub fn from_thrift( | ||
| schema_descr: SchemaDescPtr, | ||
| mut rg: RowGroup, | ||
| first_row_number: Option<i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@etseidl seems to have refactored quite a lot of metadata / thrift functionality, but I'm wondering, since this function was used directly by delta-rs, and is thus considered an API, shouldn't we avoid changing it, and rather factor out the code that does the core of conversion, and add another entry-point that has first_row_number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this lands in arrow-57, it can be a breaking change. Otherwise... agree we need to be careful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I merged main now and resolved these conflicts (next step is addressing the API questions). @jkylling could we make first_row_number non-optional now? Also now I think that it's not a breaking change, when it comes to metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thrilled to see this!
Please let me know if I can help in any way. I can make it my top priority to work on this, as we need to make use of it in the next few weeks.
Our use-case is to leverage this from iceberg-rust, which uses ParquetRecordBatchStreamBuilder. The API seems to work for that, but I understand from other comments that it may not be the most desirable one - happy to help either with research/proposal or with the implementation of the chosen option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @vustef! I'd be very happy if you want to help get row number support into the Parquet reader, either with this PR or through other alternatives. If you want to pick up this PR I can give you commit rights to the branch? Sadly, I don't have capacity to work on this PR at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jkylling, yes, please do that if you can, happy to continue where you left.
I'd also need some guidance from @scovich and @alamb on the preferred path forward. And potentially help from @etseidl if I hit a wall with merging metadata changes that happened in the meanwhile (but more on that once I try it out).
| selection: None, | ||
| limit: None, | ||
| offset: None, | ||
| row_number_column: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also change schema and/or parquet_schema functions to include the row_number_column?
|
@jkylling what tests did you use to run on each push? Wondering about the minimal dev loop I can do before pushing. |
It's been a while since I worked on this, but this is what showed up in my shell history: |
|
@jkylling at some point I'm going to need to change the description of the PR, not sure how best to do that. Also perhaps it'd be better fit for the current stage to be a draft, until the API is agreed upon (although that is not too important). |
|
I agree that before putting too much effort into this PR we agree on the correct way to implement row numbers (I defer to @alamb and others for the arrow side of this). One concern I have with the approach here is how to provide exact row numbers if we start selectively reading row group metadata. If we don't have metadata for all preceding row groups, we can't know the starting row number. This at least argues for reverting back to using an |
I don't think we will be able to provide row numbers if we don't have all the preceding row group metadata Given the main usecase I have heard so far is indexing and delete vectors, which require exact and accurate row numbers, I think it would be better if the reader simply returned an error if it was configured to read row numbers but didn't have enough information to do so |
Sounds good, I'll bring back the I'll hold off with pushing changes here until we finish discussion on the GH issue. And most likely will create a new PR, because I'm not able to update description of this one nor move it between draft and active. (If jkylling doesn't object. Will add due credits to him in there ofc, and keep his commits). |
Which issue does this PR close?
Closes #7299.
What changes are included in this PR?
In this PR we:
ArrowReaderBuilderto set arow_number_columnused to extend the readRecordBatcheswith an additional column with file row numbers.ArrayReaderto the vector ofArrayReaders reading columns from the Parquet file, if therow_number_columnis set in the reader configuration. This is aRowNumberReader, which is a specialArrayReader. It reads no data from the Parquet pages, but uses the first row numbers in theRowGroupMetaDatato keep track of progress.The
RowGroupMetaData::first_row_numberisOption<i64>, since it is possible that the row number is unknown (I encountered an instance of this when trying to integrate this PR in delta-rs), and it's better ifNoneis used instead of some special integer value.The performance impact of this PR should be negligible when the row number column is not set. The only additional overhead would be the tracking of the
first_row_numberof each row group.Are there any user-facing changes?
We add an additional public method:
ArrowReaderBuilder::with_row_number_columnThere are a few breaking changes as we touch a few public interfaces:
RowGroupMetaData::from_thriftandRowGroupMetaData::from_thrift_encryptedtakes an additional parameterfirst_row_number: Optional<i64>.RowGroupshas an additional methodRowGroups::row_groups. Potentially this method could replace theRowGroups::num_rowsmethod or provide a default implementation for it.ParquetError::RowGroupMetaDataMissingRowNumber.I'm very open to suggestions on how to reduce the amount of breaking changes.