-
Notifications
You must be signed in to change notification settings - Fork 1k
Implement arrow-avro Reader and ReaderBuilder #7834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…Builder # Summary This commit introduces significant updates to the Avro reader implementation: * Renamed `ReadOptions` to `ReaderBuilder` for clarity and flexibility. * Modularized reading and decoding logic by introducing `Reader` and `Decoder` interfaces, improving usability and customization. * Updated benchmarks, unit tests, and examples to reflect these changes. * Enhanced documentation and examples to showcase new usage patterns and async capabilities. # Impact Improves readability, reusability, and user experience.
scovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting! I think we can significantly improve the reader's control flow, see comments.
arrow-avro/src/reader/mod.rs
Outdated
| pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, ArrowError> { | ||
| let record_decoder = if let Some(schema) = self.schema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken, we could eliminate a lot of duplicated code by defining a private build_impl method that returns both header and decoder:
fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> {... and then build_decoder simplifies to just:
let (_, decoder) = self.build_impl(&mut reader)?;
decoderwhile build simplifies to just:
let (header, decoder) = self.build_impl(&mut reader)?
let compression = header.compression()?;
Ok(Reader { ... })There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: The code from build has less redundancy than that of build_decoder, and is probably the better starting point of the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich This is a good callout! I went ahead and abstracted this logic into new private build_impl and make_record_decoder methods per your suggestion.
arrow-avro/src/reader/mod.rs
Outdated
| /// Return an iterator of [`Block`] from the provided [`BufRead`] | ||
| fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> { | ||
| let mut decoder = BlockDecoder::default(); | ||
| impl<R> Reader<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that the two methods defined in this impl block don't depend on the structure of R... but is there any benefit to separating them out like this? It's impossible to do anything useful with a Reader unless R: BufRead?
I guess the real question would be: Why do we not requireReader<R: BufRead>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich TY for catching that. I made that change as well.
arrow-avro/src/reader/mod.rs
Outdated
| if consumed > 0 { | ||
| self.block_data.drain(..consumed); | ||
| } | ||
| match self.decoder.flush()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems odd to flush even if no bytes were consumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually... no bytes consumed guarantees the batch is complete (but it also means we paid to fetch a block we didn't need yet).
But that means we have a problem -- this "tentative" flush call would have the side effect of producing large numbers of tiny batches, no? It seems like we need a way to detect the batch is complete without the side effects of calling flush. For example, the CSV reader provides a (strangely named IMO) capacity method for this purpose. The JSON reader doesn't even try to optimize for this corner case (batch full at block boundary), and so doesn't have this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich Really appreciate you calling this out. The capacity based approach is much cleaner and more efficient.
I went ahead and added two new methods:
/// Returns the number of rows that can be added to this decoder before it is full.
pub fn capacity(&self) -> usize {
self.batch_size.saturating_sub(self.decoded_rows)
}
/// Returns true if the decoder has reached its capacity for the current batch.
pub fn batch_is_full(&self) -> bool {
self.capacity() == 0
}and then utilized them the way you recommended in your other comment:
'outer: while !self.finished && !self.decoder.batch_is_full()
arrow-avro/src/reader/mod.rs
Outdated
| match maybe_block { | ||
| Some(block) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good spot for:
let Some(block) = maybe_block else {
self.finished = true;
...
break;
}(breaking the loop anyway causes the function to return the result of self.decoder.flush())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactored read method should cover this enhancement.
arrow-avro/src/reader/mod.rs
Outdated
| None => { | ||
| if !self.block_data.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances could flush return None (= no rows available) and yet there are still bytes available to decode? And why would we want to break out of the loop in order to immediately call flush a second time? Shouldn't we rather try to decode more bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having trouble following the state machine here, but I almost wonder if the control flow could be expressed more cleanly like this?
'outer: while !self.finished && !self.decoder.batch_is_full() {
// make sure we have a block to work with
while self.block_cursor == self.block_data.len() {
let buf = self.reader.fill_buf()?;
if buf.is_empty() {
// EOF... hopefully the batch was complete (flush will decide)
self.finished = true;
break 'outer;
}
// Try to decode another block, with three possible outcomes:
// 1. We successfully decode a block => inner loop exits
// 2. We consume at least one byte => inner loop continues (fetch more data)
// 3. We fail to consume any bytes => return an error
let consumed = self.block_decoder.decode(buf)?;
self.reader.consume(consumed);
if let Some(block) = self.block_decoder.flush() {
let block_data = if let Some(ref codec) = self.compression {
codec.decompress(&block.data)?
} else {
block.data
};
self.block_data = block_data;
self.block_cursor = 0;
} else if consumed == 0 {
return Err(ArrowError::ParseError(
"Could not decode next Avro block from partial data".to_string(),
));
}
}
// Try to decode more rows from the block, with three possible outcomes:
// 1. Block empty, incomplete batch => loop continues
// * Next loop iteration will fetch a new block
// 2. Block empty, complete batch => loop exits (flush)
// * Next read will fetch a new block
// 3. Block non-empty => batch must be complete => loop exits (flush)
// * Next read will keep consuming from this same block
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
self.block_cursor += consumed;
}
self.decoder.flush()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances could flush return None (= no rows available) and yet there are still bytes available to decode?
This could occur at the end of the input stream if the remaining data in self.block_data is insufficient to form a complete Avro record, aka the data is truncated. That being said I should have raised a ParseError, not quietly returned Ok(None).
And why would we want to break out of the loop in order to immediately call flush a second time? Shouldn't we rather try to decode more bytes?
The extra break/flush you called out was an attempt to force one more decode and flush for whatever is leftover after seeing EOF. However, as your snippet shows, it makes the state machine much more complicated than it has to be and was in general a sub-optimal implementation.
I'm having trouble following the state machine here, but I almost wonder if the control flow could be expressed more cleanly like this?
That is a much cleaner way to implement this. Just cleaning this logic up like you called out made a huge impact. I went ahead and adopted it with only one change:
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
if consumed == 0 && self.block_cursor < self.block_data.len() {
self.block_cursor = self.block_data.len();
} else {
self.block_cursor += consumed;
}instead of:
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
self.block_cursor += consumed;I made that change to handle a potential edge case where the decoder is unable to make any progress (consumed == 0) on the current data, even if there's still data left in the block. This could theoretically happen if the data is corrupt or malformed in a way that the decoder can't parse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and adopted it with only one change... to handle a potential edge case where the
decoderis unable to make any progress (consumed == 0) on the current data, even if there's still data left in the block. This could theoretically happen if the data is corrupt or malformed in a way that the decoder can't parse it.
We definitely don't want to enter an infinite loop, but isn't that a pretty blatant error condition that we should surface to the caller, rather than silently dropping data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question tho -- do we know of any actual condition where the decoder could fail to make progress without detecting any problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pointed my shiny new AI assistant at the code, and after several rounds of inquiry it came up with the following scenario:
// Malformed block data with empty record schema
let block_data = vec![0x01, 0x02, 0x03]; // Spurious bytes
let empty_schema = /* record with zero fields */;
let mut decoder = RecordDecoder::new(empty_schema);
// This returns Ok(0) - no error, but no progress
let consumed = decoder.decode(&block_data, 1)?; // consumed == 0What's less clear to me is (a) whether it's even valid to request an empty schema; and (b) what should happen in that case, because it would seem to apply even if the file contains valid data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(a) whether it's even valid to request an empty schema
It's technically not valid to have an empty schema.
(b) what should happen in that case, because it would seem to apply even if the file contains valid data
Maybe we can do something along the lines of when strict_mode is true we throw an error and when strict_mode is false, we try to work around it before throwing an error?
arrow-avro/src/reader/mod.rs
Outdated
| if !self.block_data.is_empty() { | ||
| let consumed = self.decoder.decode(&self.block_data)?; | ||
| if consumed > 0 { | ||
| self.block_data.drain(..consumed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shifts remaining bytes into the gap; is there a way to avoid that cost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a good call out, this should be taken care off now in the refactored read method you recommended, specifically:
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
arrow-avro/src/reader/mod.rs
Outdated
| let read_len = buf.len(); | ||
| let consumed_len = self.block_decoder.decode(buf)?; | ||
| self.reader.consume(consumed_len); | ||
| if consumed_len == 0 && read_len != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't L359 guarantee that read_len is non-zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100%, also a good catch. This redundancy was another symptom of the convoluted control flow in the original implementation and should be resolved now.
f50c684 to
2a2cced
Compare
|
@scovich Thank you so much for that thorough review. I made the recommended changes and cleaned the code up per your suggestions. Let me know what you think! |
scovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot cleaner now. Two main questions remain, about header decoding and a corner case in the reader.
| if consumed == 0 && self.block_cursor < self.block_data.len() { | ||
| self.block_cursor = self.block_data.len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem quite right? If we failed to make progress on a non-empty block and a non-full batch, wouldn't that indicate something is wrong, similar to L359 above? Why would it be correct to just skip the rest of the block and try to keep going?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(part of me wonders if this might be related to the header decoding question on the other comment thread -- are we trying to skip the unwanted header bytes here, in order to reach the data of interest?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The core of the issue, and the reason for this code, lies in handling a specific edge case allowed by the Avro specification: records that have a zero-byte encoding. This can happen, for instance, with a record that has no fields or contains only fields of the null type.
There is a significant drawback to the current approach: if a data block contains a mix of zero-sized and non-zero-sized records, this logic would cause any records that appear after the first zero-sized one to be skipped, leading to data loss. It's a trade-off to prevent a hang at the cost of potential data loss in this specific and rare edge case. I planned to develop it out further in a follow-up PR before this is publicly exposed via a new records_read counter based approach. If needed though I can add it to this PR as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's... interesting...
- How does the reader determine that they have encountered such a record in the first place, if it occupies zero bytes?
- How does the reader infer the number of zero-byte records the writer intended to produce?
- How does one get "past" the record to read the (presumably normal) records that fill the rest of the block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the reader determine that they have encountered such a record in the first place, if it occupies zero bytes?
The reader "detects" a zero byte record not by reading bytes from the data stream, but by interpreting the Avro schema it was given. The schema would look something like this: {"type": "record", "name": "EmptyRecord", "fields": []}.
How does the reader infer the number of zero-byte records the writer intended to produce?
It doesn't, and that's the core of the problem. The Avro binary format specifies that each data block begins with a count of the records it contains. However, the current Reader implementation does not use this record count to drive the decoding process (I plan on adding that logic soon). Instead, it decodes the raw byte payload of the block (self.block_data) until the buffer is empty. When it encounters the first zero byte record, it consumes 0 bytes and is unable to make progress. It has no way of knowing if the writer intended a zero byte record and if so how many, because there are no bytes in the stream to differentiate them. This ambiguity leads to the infinite loop that the code in question is trying to prevent.
How does one get "past" the record to read the (presumably normal) records that fill the rest of the block?
You can't as this is currently implemented, which is precisely why I added that check as a short term fix. I'll need to add in logic like this:
let record_count = block.num_records; // Read from block header
for _ in 0..record_count {
if self.decoder.batch_is_full() {
// Stop if the Arrow batch is full, but remember our progress
// in this Avro block for the next call to `read()`.
break;
}
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
self.block_cursor += consumed;
}I just need to research the best way to go about this that solves all edge cases. I'm planning to follow-up on this work (before it's public) to both resolve this scenario and enable schema evolution. I just didn't want this PR to get too large and complicated is all, but I'm also happy to add the logic in here as well.
Also @tustvold not sure what you're original vision was for this reader. I wanted to make sure I keep true to that as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I don't have the capacity to review at the moment, or realistically for the foreseeable future, however, the major goal of the structure of the reader was to make it possible to read in a stream oriented fashion - a la the CSV or JSON decoders. This makes it possible to use with undelimited data streams from the likes of object_store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reader "detects" a zero byte record not by reading bytes from the data stream, but by interpreting the Avro schema it was given. The schema would look something like this:
{"type": "record", "name": "EmptyRecord", "fields": []}.
@jecsand838 thanks a lot for the context. This almost sounds like a flaw in the avro spec, if there's no way to consume bytes after an empty record? I'd be curious how other implementations handle it?
Also: what happens if the reader provides a schema with fewer fields than the file's schema? Is the data layout self-describing enough that the decoder can reliably detect the extra bytes and either skip them or blow up?
Asking because (according to my not-super-trustworthy AI coding assistant) there seem to be several cases here:
- The reader asked for an empty schema, even tho the file contains data.
- This should either blow up due to schema mismatch or (more likely) just return however many empty rows the block contained.
- It is correct to ignore all bytes in the block, because we don't want the columns they encode
- But wouldn't the decoder have anyway consumed them as part of handling extra/unwanted columns in general?
- The writer produced an empty schema.
- This should return however many empty rows the block contains
- The block should not contain any bytes, because there was nothing to encode
- Seems like we should blow up if spurious bytes are present, unless the avro spec says otherwise?
- [AI assistant claim] Nulls occupy zero bytes, and so an all-null row would occupy zero bytes even if the schema is non-empty.
- I question this one because there has to be some way of knowing the row (or its values) are all-NULL, and that information would have to come from the byte stream the decoder is consuming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich 100% and the Avro spec can definitely be tricky and nuanced.
if there's no way to consume bytes after an empty record? I'd be curious how other implementations handle it?
From my research the answer seems to be that no extra bytes exist after an “empty record” in Avro. It seems like what the code should do is simply count that it has seen one record and move on.
In the Java implementation GenericDatumReader.readRecord iterates over the field list; with zero fields the loop body never executes, so no calls are made and the input position stays where it is. The outer caller still decrements the blockRemaining counter, so the Decoder read logic progresses correctly.
what happens if the reader provides a schema with fewer fields than the file's schema?
If the schema used to write the data (in this instance the file's schema) cannot be resolved with the reader's schema per the avro specification's schema resolution requirements then we should return an error for every record written using that invalid schema imo. Handling the error would then be up to the caller. In the scenario you called out, the missing fields would be ignored and no error returned per the specification: if the writer’s record contains a field with a name not present in the reader’s record, the writer’s value for that field is ignored. (Remember that an Avro schema is just a Record type with the fields of each row defined as Avro Record Fields in the schema Record.)
Is the data layout self-describing enough that the decoder can reliably detect the extra bytes and either skip them or blow up?
It should be imo. From looking over the Java implementation it seems the writer schema either needs to be:
- Explicitly provided: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L122
- Provided in the first row (Header) of the stream: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L142
Or a MissingSchemaException is thrown: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L42
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Based on all that, it sounds like the current PR is mostly correct in its handling of empty records... with the future improvement to return the correct number of empty records before dropping the block? ![]()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich That's correct.
| match self.schema { | ||
| Some(ref schema) => { | ||
| let record_decoder = self.make_record_decoder(schema)?; | ||
| Ok(Decoder::new(record_decoder, self.batch_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming from the parquet world, I'm a bit surprised the decoder doesn't need access to the header?
But looking at this code, the header only seems to provide schema and compression info?
And the decoder leaves compression to the reader for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, a bigger question -- the same reader provides the header and the data. Does the caller have to keep track of whether the header was already consumed?
- If this method already consumed it, can they re-read it some other way later?
- If this method did not consume it, will that confuse the block decoder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming from the parquet world, I'm a bit surprised the decoder doesn't need access to the header?
But looking at this code, the header only seems to provide schema and compression info?
And the decoder leaves compression to the reader for some reason?
You are correct. In the Avro container file format, the header contains the file schema and the compression codec used for the data blocks. The rest of the file is a series of blocks, each containing some number of records. The Reader is a high-level component that handles the file-level concerns: it reads and parses the header, and then it reads each data block, decompressing it if necessary.
The Decoder is a lower-level component that only knows how to decode a stream of uncompressed Avro record data. This separation of concerns makes the Decoder more flexible, for example, allowing it to be used in async contexts where data arrives in chunks, as shown in the documentation's decode_stream example.
Actually, a bigger question -- the same reader provides the header and the data. Does the caller have to keep track of whether the header was already consumed?
If this method already consumed it, can they re-read it some other way later?
If this method did not consume it, will that confuse the block decoder?
The behavior depends on whether you provide a schema:
- If you provide a schema (the code snippet you commented on),
build_decoderdoes not read from the input reader. It assumes the caller will handle the Avro container format themselves (i.e., skip the header and decompress blocks before passing the data to the Decoder). If you were to feed the header bytes to theDecodercreated this way, it would indeed fail to parse them. This path is intended for advanced, low-level use cases. - If you do NOT provide a schema,
build_decoderreads from the reader to parse the header and discover the schema. The header is consumed from the reader in a forward-only manner, so you cannot re-read it unless the underlying stream supports Seek. After this call, you would start feeding the subsequent data blocks to the returnedDecoder.
So, the caller of build_decoder must be aware of whether the header has been consumed. For most situations, the higher-level Reader is the recommended API, as it automatically handles all this state management for you.
For follow-up work, we could also add more explicit support for Avro's schema evolution capabilities. Currently, the ReaderBuilder allows a user to provide a schema, which will be used for decoding. A more advanced implementation would take this user-provided schema as the "reader's schema" and use the schema from the file header as the "writer's schema". The decoder could then resolve the data according to Avro's schema evolution rules, safely handling things like field aliasing, additions, and reordering. This would significantly improve the crate's robustness in production environments where schemas often change over time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so the provided schema is taken at face value today, with no attempt to reconcile it against the file's actual schema? What happens if they disagree in matters large or small?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meanwhile -- it sounds like whoever creates a decoder must read the header first and provide the schema. Because otherwise they lose access to the compression codec, which readers are apparently responsible to deal with rather than decoders. That said, I never quite understood why the decoder's flush method doesn't handle compression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point about keeping them separate for loose coupling and optimization reasons. It's just weird that it's so easy to "wedge" yourself by not providing an explicit schema, which consumes the header in order to infer one, which makes it impossible to determine the compression codec you should use without re-reading the header.
Put another way: It looks to me like the decoder shouldn't be in the business of reading headers, ever. Whoever creates the decoder should have already consumed the header (in order to learn compression codec and possibly the schema), and that caller is responsible to provide a schema they deem appropriate (which may or may not have come from the header). As a bonus, any schema reconciliation would also be the caller's responsibility, and the decoder can focus on its job of decoding blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we could follow the example of the parquet reader builder. It has a new_with_metadata constructor that allows the caller to provide the parquet footer, and also has a metadata method that allows the caller to access the parquet footer (regardless of whether it was provided or read internally).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, reading an avro header seems a lot simpler than reading a parquet footer... and I don't think there's any (easy?) way to guess how big an avro file's header might be, so we'd have to consume it even if we don't care what it says. That pushes me toward the first (simpler) suggestion, that caller consumes the header and decoder just does what it's told.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just weird that it's so easy to "wedge" yourself by not providing an explicit schema, which consumes the header in order to infer one, which makes it impossible to determine the compression codec you should use without re-reading the header.
When dealing with object container file format, i.e. your standard .avro files, the header has to be there and all records must match the schema of the header: https://avro.apache.org/docs/1.11.1/specification/#object-container-files. In the event with_schema is used AND we're reading from an object container file, the schema from with_schema would be the reader schema and the schema from the file's header would be the writer schema. In that event or more generally whenever we have different, but compatible per the specification, reader and writer schemas, part of decoding each row will involve mapping from the initial schema used to write the data to the format the data must be in to read into Arrow essentially.
To be clear, that logic is not currently in place because we aren't ready to implement schema resolution just yet, so some of the current code, such as:
match self.schema {
Some(ref schema) => {
let record_decoder = self.make_record_decoder(schema)?;
Ok(Decoder::new(record_decoder, self.batch_size))
}
None => {
let (_, decoder) = self.build_impl(&mut reader)?;
Ok(decoder)
}
}is place holder logic.
Also I'll add that a compliant and mature Avro decoder is expected to handle schema resolution. It's one of the most powerful and fundamental features of the Avro format. It's certainly one of the biggest reason's I'm working on this contribution and not using another format. So we will definitely want it completed before marking the arrow-avro reader as public. I just need to finish and merge in the PRs for field metadata, "out of spec" impala support, and the remaining types, such as Duration first.
a new_with_metadata constructor that allows the caller to provide the parquet footer, and also has a metadata method that allows the caller to access the parquet footer (regardless of whether it was provided or read internally)
That's not a bad idea tbh. In scenarios where we are decoding streaming blocks (not files), maybe we can use a function like that to inform the writer's schema while the with_schema method still provides the reader's schema. From there maybe we can even support the caller changing the writer's schema as the streaming data comes in. Usually this scenario will arise when decoding rows coming in via protocol wire format: https://avro.apache.org/docs/1.11.1/specification/#protocol-wire-format. I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder
How would the caller do that, out of curiosity? A quick skim of the wire protocol didn't turn up anything obvious. Does the sender embed a new header+schema that the caller should notice and respond to as they see fit?
| .unwrap(); | ||
|
|
||
| remaining -= to_read; | ||
| fn decode_stream<S: Stream<Item = Bytes> + Unpin>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a competent async reader implementation... any reason it shouldn't be part of the public API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah -- because not everyone wants to use the futures crate
4934f3d to
b9ec9a5
Compare
scovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good milestone! 🚢
| match self.schema { | ||
| Some(ref schema) => { | ||
| let record_decoder = self.make_record_decoder(schema)?; | ||
| Ok(Decoder::new(record_decoder, self.batch_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder
How would the caller do that, out of curiosity? A quick skim of the wire protocol didn't turn up anything obvious. Does the sender embed a new header+schema that the caller should notice and respond to as they see fit?
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jecsand838 @scovich and @tustvold
In my mind this PR is good improvement and a step in the right direction.
I didn't quite follow all the nuance about schema resolution, etc but it seems like it is well in hand
Let me know if there are any concerns about merging this; Otherwise i'll do it over the next day or two
Thanks again
arrow-avro/src/codec.rs
Outdated
| pub struct AvroField { | ||
| name: String, | ||
| data_type: AvroDataType, | ||
| pub(crate) data_type: AvroDataType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this `pub(crate) needed? Not a big deal but it seems unecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch! I had changed it on accident while setting up the tests and forgot to change it back. Just pushed up the corrected code for this.
| use crate::reader::block::{Block, BlockDecoder}; | ||
| use crate::reader::header::{Header, HeaderDecoder}; | ||
| use arrow_schema::ArrowError; | ||
| //! Avro reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface is really nice 👍
| pub fn try_new_with_options( | ||
| data_type: &AvroDataType, | ||
| options: ReadOptions, | ||
| use_utf8view: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was somewhat confused at first about this change as I thought it made the API harder to extend
Then I realized that RecordDecoder is not pub anymore
So maybe we could mark these methods and the struct as pub(crate) to be explicit that it is not re-exported outside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another good call out. I just pushed up those changes marking RecordDecoder and it's methods as pub(crate).
…r` and its methods from `pub` to `pub(crate)`.
e172da9 to
ee4eee0
Compare
Co-authored-by: Andrew Lamb <[email protected]>
Introduced `RecordDecoderBuilder` to enable more flexible `RecordDecoder` configuration, including support for `utf8_view` and `strict_mode`. Updated `try_new` to use the builder with default settings.
Co-authored-by: Ryan Johnson <[email protected]>
The caller can do this by being signaled to either read a new .avsc file, retrieve an updated schema definition from a schema registry, or attempt to decode an Avro header from the sender (which I think happens) using the We can definitely add utility functions to assist with this as needed and/or make the |
f9ac498 to
bf6e311
Compare
|
🚀 |
# Which issue does this PR close? - Part of #4886 - Follow up to #7834 # Rationale for this change The initial Avro reader implementation contained an under-developed and temporary safeguard to prevent infinite loops when processing records that consumed zero bytes from the input buffer. When the `Decoder` reported that zero bytes were consumed, the `Reader` would advance it's cursor to the end of the current data block. While this successfully prevented an infinite loop, it had the critical side effect of silently discarding any remaining data in that block, leading to potential data loss. This change enhances the decoding logic to handle these zero-byte values correctly, ensuring that the `Reader` makes proper progress without dropping data and without risking an infinite loop. # What changes are included in this PR? - **Refined Decoder Logic**: The `Decoder` has been updated to accurately track and report the number of bytes consumed for all values, including valid zero-length records like `null` or empty `bytes`. This ensures the decoder always makes forward progress. - **Removal of Data-Skipping Safeguard**: The logic in the `Reader` that previously advanced to the end of a block on a zero-byte read has been removed. The reader now relies on the decoder to report accurate consumption and advances its cursor incrementally and safely. - * New integration test using a temporary `zero_byte.avro` file created via this python script: https://gist.github.com/jecsand838/e57647d0d12853f3cf07c350a6a40395 # Are these changes tested? Yes, a new `test_read_zero_byte_avro_file` test was added that reads the new `zero_byte.avro` file and confirms the update. # Are there any user-facing changes? N/A # Follow-Up PRs 1. PR to update `test_read_zero_byte_avro_file` once apache/arrow-testing#109 is merged in.
…Resolution (#8006) # Which issue does this PR close? - Part of #4886 - Follow up to #7834 # Rationale for this change Apache Avro’s [single object encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding) prefixes every record with the marker `0xC3 0x01` followed by a `Rabin` [schema fingerprint ](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints) so that readers can identify the correct writer schema without carrying the full definition in each message. While the current `arrow‑avro` implementation can read container files, it cannot ingest these framed messages or handle streams where the writer schema changes over time. The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin) hashed fingerprint of the [parsed canonical form of a schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas) to look up the `Schema` from a local schema store or registry. This PR introduces **`SchemaStore`** and **fingerprinting** to enable: * **Zero‑copy schema identification** for decoding streaming Avro messages published in single‑object format (i.e. Kafka, Pulsar, etc) into Arrow. * **Dynamic schema evolution** by laying the foundation to resolve writer reader schema differences on the fly. **NOTE:** Schema Resolution support in `Codec` and `RecordDecoder` coming the next PR. # What changes are included in this PR? | Area | Highlights | | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | **`reader/mod.rs`** | Decoder now detects the `C3 01` prefix, extracts the fingerprint, looks up the writer schema in a `SchemaStore`, and switches to an LRU cached `RecordDecoder` without interrupting streaming; supports `static_store_mode` to skip the 2 byte peek for high‑throughput fixed‑schema pipelines. | | **`ReaderBuilder`** | New builder configuration methods: `.with_writer_schema_store`, `.with_active_fingerprint`, `.with_static_store_mode`, `.with_reader_schema`, `.with_max_decoder_cache_size`, with rigorous validation to prevent misconfiguration. | | **Unit tests** | New tests covering fingerprint generation, store registration/lookup, schema switching, unknown‑fingerprint errors, and interaction with UTF8‑view decoding. | | **Docs & Examples** | Extensive inline docs with examples on all new public methods / structs. | --- # Are these changes tested? Yes. New tests cover: 1. **Fingerprinting** against the canonical examples from the Avro spec 2. **`SchemaStore` behavior** deduplication, duplicate registration, and lookup. 3. **Decoder fast‑path** with `static_store_mode=true`, ensuring the prefix is treated as payload, the 2 byte peek is skipped, and no schema switch is attempted. # Are there any user-facing changes? N/A # Follow-Up PRs 1. Implement Schema Resolution Functionality in Codec and RecordDecoder 2. Add ID `Fingerprint` variant on `SchemaStore` for Confluent Schema Registry compatibility 3. Improve arrow-avro errors + add more benchmarks & examples to prepare for public release --------- Co-authored-by: Ryan Johnson <[email protected]>
Which issue does this PR close?
Part of #4886
Rationale for this change
This PR refactors the Avro reader's public API to provide an ergonomic and idiomatic experience.
The new API provides a high-level and fluent
ReaderandReaderBuilder, making it more consistent with other readers in the ecosystem.What changes are included in this PR?
The core of this PR is the introduction of a struct-based API design for the arrow-avro reader.
Reader<R: BufRead>: The main addition is a high-level struct that implements the standard and traits. This encapsulates the entire reading process, allowing users to simply iterate over the reader.ReaderBuilder: This provides a fluent interface for configuring and constructing aReader. It simplifies the setup for options such as:batch_size,strict_mode,use_utf8view,with_schema.Decoder: For more advanced, asynchronous use cases, a streaming decoder has been introduced. It's designed to be used in streaming contexts where Avro data arrives in chunks.Are these changes tested?
Yes. The existing tests, such as
test_alltypesandtest_utf8view_supporthave been refactored to use the new API. This refactor validates the new abstractions. Additionally, new tests have been added to cover the streaming functionality, ensuring it works correctly.Are there any user-facing changes?
N/A
Follow-Up PRs
arrow-avroCompression types.strict_modeonarrow-avroReaderBuilder For Impala Avro Supportarrow-avroReader Integration Tests.arrow-avroEmpty Record Decoding.arrow-avroDecoder Schema Resolution. (This one could be broken up depending on size and thearrow-avroReader should be ready to go public upon completion)