Skip to content

Conversation

@jecsand838
Copy link
Contributor

@jecsand838 jecsand838 commented Jul 1, 2025

Which issue does this PR close?

Part of #4886

Rationale for this change

This PR refactors the Avro reader's public API to provide an ergonomic and idiomatic experience.
The new API provides a high-level and fluent Reader and ReaderBuilder, making it more consistent with other readers in the ecosystem.

What changes are included in this PR?

The core of this PR is the introduction of a struct-based API design for the arrow-avro reader.

  • Reader<R: BufRead>: The main addition is a high-level struct that implements the standard and traits. This encapsulates the entire reading process, allowing users to simply iterate over the reader.
  • ReaderBuilder: This provides a fluent interface for configuring and constructing a Reader. It simplifies the setup for options such as: batch_size, strict_mode, use_utf8view, with_schema.
  • Streaming Decoder: For more advanced, asynchronous use cases, a streaming decoder has been introduced. It's designed to be used in streaming contexts where Avro data arrives in chunks.

Are these changes tested?

Yes. The existing tests, such as test_alltypes and test_utf8view_support have been refactored to use the new API. This refactor validates the new abstractions. Additionally, new tests have been added to cover the streaming functionality, ensuring it works correctly.

Are there any user-facing changes?

N/A

Follow-Up PRs

  1. Add Remaining arrow-avro Compression types.
  2. Complete strict_mode on arrow-avro ReaderBuilder For Impala Avro Support
  3. Add Remaining arrow-avro Reader Integration Tests.
  4. Complete arrow-avro Empty Record Decoding.
  5. Implement arrow-avro Decoder Schema Resolution. (This one could be broken up depending on size and the arrow-avro Reader should be ready to go public upon completion)

…Builder

# Summary

This commit introduces significant updates to the Avro reader implementation:

* Renamed `ReadOptions` to `ReaderBuilder` for clarity and flexibility.
* Modularized reading and decoding logic by introducing `Reader` and `Decoder` interfaces, improving usability and customization.
* Updated benchmarks, unit tests, and examples to reflect these changes.
* Enhanced documentation and examples to showcase new usage patterns and async capabilities.

# Impact

Improves readability, reusability, and user experience.
@github-actions github-actions bot added the arrow Changes to the arrow crate label Jul 1, 2025
Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting! I think we can significantly improve the reader's control flow, see comments.

Comment on lines 282 to 283
pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, ArrowError> {
let record_decoder = if let Some(schema) = self.schema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, we could eliminate a lot of duplicated code by defining a private build_impl method that returns both header and decoder:

fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> {

... and then build_decoder simplifies to just:

let (_, decoder) = self.build_impl(&mut reader)?;
decoder

while build simplifies to just:

let (header, decoder) = self.build_impl(&mut reader)?
let compression = header.compression()?;
Ok(Reader { ... })

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: The code from build has less redundancy than that of build_decoder, and is probably the better starting point of the two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich This is a good callout! I went ahead and abstracted this logic into new private build_impl and make_record_decoder methods per your suggestion.

/// Return an iterator of [`Block`] from the provided [`BufRead`]
fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> {
let mut decoder = BlockDecoder::default();
impl<R> Reader<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the two methods defined in this impl block don't depend on the structure of R... but is there any benefit to separating them out like this? It's impossible to do anything useful with a Reader unless R: BufRead?

I guess the real question would be: Why do we not requireReader<R: BufRead>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich TY for catching that. I made that change as well.

if consumed > 0 {
self.block_data.drain(..consumed);
}
match self.decoder.flush()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems odd to flush even if no bytes were consumed?

Copy link
Contributor

@scovich scovich Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... no bytes consumed guarantees the batch is complete (but it also means we paid to fetch a block we didn't need yet).

But that means we have a problem -- this "tentative" flush call would have the side effect of producing large numbers of tiny batches, no? It seems like we need a way to detect the batch is complete without the side effects of calling flush. For example, the CSV reader provides a (strangely named IMO) capacity method for this purpose. The JSON reader doesn't even try to optimize for this corner case (batch full at block boundary), and so doesn't have this problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich Really appreciate you calling this out. The capacity based approach is much cleaner and more efficient.

I went ahead and added two new methods:

    /// Returns the number of rows that can be added to this decoder before it is full.
    pub fn capacity(&self) -> usize {
        self.batch_size.saturating_sub(self.decoded_rows)
    }

    /// Returns true if the decoder has reached its capacity for the current batch.
    pub fn batch_is_full(&self) -> bool {
        self.capacity() == 0
    }

and then utilized them the way you recommended in your other comment:

'outer: while !self.finished && !self.decoder.batch_is_full()

Comment on lines 373 to 374
match maybe_block {
Some(block) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good spot for:

let Some(block) = maybe_block else {
    self.finished = true;
      ...
    break;
}

(breaking the loop anyway causes the function to return the result of self.decoder.flush())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactored read method should cover this enhancement.

Comment on lines 347 to 348
None => {
if !self.block_data.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances could flush return None (= no rows available) and yet there are still bytes available to decode? And why would we want to break out of the loop in order to immediately call flush a second time? Shouldn't we rather try to decode more bytes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble following the state machine here, but I almost wonder if the control flow could be expressed more cleanly like this?

'outer: while !self.finished && !self.decoder.batch_is_full() {
    // make sure we have a block to work with
    while self.block_cursor == self.block_data.len() {
        let buf = self.reader.fill_buf()?;
        if buf.is_empty() {
            // EOF... hopefully the batch was complete (flush will decide)
            self.finished = true;
            break 'outer;
        }

        // Try to decode another block, with three possible outcomes:
        // 1. We successfully decode a block => inner loop exits
        // 2. We consume at least one byte => inner loop continues (fetch more data)
        // 3. We fail to consume any bytes => return an error
        let consumed = self.block_decoder.decode(buf)?;
        self.reader.consume(consumed);
        if let Some(block) = self.block_decoder.flush() {
            let block_data = if let Some(ref codec) = self.compression {
                codec.decompress(&block.data)?
            } else {
                block.data
            };
            self.block_data = block_data;
            self.block_cursor = 0;
        } else if consumed == 0 {
            return Err(ArrowError::ParseError(
                "Could not decode next Avro block from partial data".to_string(),
            ));
        }
    }
    
    // Try to decode more rows from the block, with three possible outcomes:
    // 1. Block empty, incomplete batch => loop continues
    //    * Next loop iteration will fetch a new block
    // 2. Block empty, complete batch => loop exits (flush)
    //    * Next read will fetch a new block
    // 3. Block non-empty => batch must be complete => loop exits (flush)
    //    * Next read will keep consuming from this same block
    let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
    self.block_cursor += consumed;
}

self.decoder.flush()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich

Under what circumstances could flush return None (= no rows available) and yet there are still bytes available to decode?

This could occur at the end of the input stream if the remaining data in self.block_data is insufficient to form a complete Avro record, aka the data is truncated. That being said I should have raised a ParseError, not quietly returned Ok(None).

And why would we want to break out of the loop in order to immediately call flush a second time? Shouldn't we rather try to decode more bytes?

The extra break/flush you called out was an attempt to force one more decode and flush for whatever is leftover after seeing EOF. However, as your snippet shows, it makes the state machine much more complicated than it has to be and was in general a sub-optimal implementation.

I'm having trouble following the state machine here, but I almost wonder if the control flow could be expressed more cleanly like this?

That is a much cleaner way to implement this. Just cleaning this logic up like you called out made a huge impact. I went ahead and adopted it with only one change:

            let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
            if consumed == 0 && self.block_cursor < self.block_data.len() {
                self.block_cursor = self.block_data.len();
            } else {
                self.block_cursor += consumed;
            }

instead of:

    let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
    self.block_cursor += consumed;

I made that change to handle a potential edge case where the decoder is unable to make any progress (consumed == 0) on the current data, even if there's still data left in the block. This could theoretically happen if the data is corrupt or malformed in a way that the decoder can't parse it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and adopted it with only one change... to handle a potential edge case where the decoder is unable to make any progress (consumed == 0) on the current data, even if there's still data left in the block. This could theoretically happen if the data is corrupt or malformed in a way that the decoder can't parse it.

We definitely don't want to enter an infinite loop, but isn't that a pretty blatant error condition that we should surface to the caller, rather than silently dropping data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question tho -- do we know of any actual condition where the decoder could fail to make progress without detecting any problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pointed my shiny new AI assistant at the code, and after several rounds of inquiry it came up with the following scenario:

// Malformed block data with empty record schema
let block_data = vec![0x01, 0x02, 0x03];  // Spurious bytes
let empty_schema = /* record with zero fields */;
let mut decoder = RecordDecoder::new(empty_schema);

// This returns Ok(0) - no error, but no progress
let consumed = decoder.decode(&block_data, 1)?;  // consumed == 0

What's less clear to me is (a) whether it's even valid to request an empty schema; and (b) what should happen in that case, because it would seem to apply even if the file contains valid data?

Copy link
Contributor Author

@jecsand838 jecsand838 Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich

(a) whether it's even valid to request an empty schema

It's technically not valid to have an empty schema.

(b) what should happen in that case, because it would seem to apply even if the file contains valid data

Maybe we can do something along the lines of when strict_mode is true we throw an error and when strict_mode is false, we try to work around it before throwing an error?

if !self.block_data.is_empty() {
let consumed = self.decoder.decode(&self.block_data)?;
if consumed > 0 {
self.block_data.drain(..consumed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shifts remaining bytes into the gap; is there a way to avoid that cost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a good call out, this should be taken care off now in the refactored read method you recommended, specifically:
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;

let read_len = buf.len();
let consumed_len = self.block_decoder.decode(buf)?;
self.reader.consume(consumed_len);
if consumed_len == 0 && read_len != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't L359 guarantee that read_len is non-zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, also a good catch. This redundancy was another symptom of the convoluted control flow in the original implementation and should be resolved now.

@jecsand838 jecsand838 force-pushed the avro-reader-implementation branch from f50c684 to 2a2cced Compare July 3, 2025 03:03
@jecsand838
Copy link
Contributor Author

@scovich Thank you so much for that thorough review. I made the recommended changes and cleaned the code up per your suggestions. Let me know what you think!

@jecsand838 jecsand838 requested a review from scovich July 3, 2025 03:21
Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot cleaner now. Two main questions remain, about header decoding and a corner case in the reader.

Comment on lines +368 to +369
if consumed == 0 && self.block_cursor < self.block_data.len() {
self.block_cursor = self.block_data.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem quite right? If we failed to make progress on a non-empty block and a non-full batch, wouldn't that indicate something is wrong, similar to L359 above? Why would it be correct to just skip the rest of the block and try to keep going?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(part of me wonders if this might be related to the header decoding question on the other comment thread -- are we trying to skip the unwanted header bytes here, in order to reach the data of interest?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core of the issue, and the reason for this code, lies in handling a specific edge case allowed by the Avro specification: records that have a zero-byte encoding. This can happen, for instance, with a record that has no fields or contains only fields of the null type.

There is a significant drawback to the current approach: if a data block contains a mix of zero-sized and non-zero-sized records, this logic would cause any records that appear after the first zero-sized one to be skipped, leading to data loss. It's a trade-off to prevent a hang at the cost of potential data loss in this specific and rare edge case. I planned to develop it out further in a follow-up PR before this is publicly exposed via a new records_read counter based approach. If needed though I can add it to this PR as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's... interesting...

  • How does the reader determine that they have encountered such a record in the first place, if it occupies zero bytes?
  • How does the reader infer the number of zero-byte records the writer intended to produce?
  • How does one get "past" the record to read the (presumably normal) records that fill the rest of the block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the reader determine that they have encountered such a record in the first place, if it occupies zero bytes?

The reader "detects" a zero byte record not by reading bytes from the data stream, but by interpreting the Avro schema it was given. The schema would look something like this: {"type": "record", "name": "EmptyRecord", "fields": []}.

How does the reader infer the number of zero-byte records the writer intended to produce?

It doesn't, and that's the core of the problem. The Avro binary format specifies that each data block begins with a count of the records it contains. However, the current Reader implementation does not use this record count to drive the decoding process (I plan on adding that logic soon). Instead, it decodes the raw byte payload of the block (self.block_data) until the buffer is empty. When it encounters the first zero byte record, it consumes 0 bytes and is unable to make progress. It has no way of knowing if the writer intended a zero byte record and if so how many, because there are no bytes in the stream to differentiate them. This ambiguity leads to the infinite loop that the code in question is trying to prevent.

How does one get "past" the record to read the (presumably normal) records that fill the rest of the block?

You can't as this is currently implemented, which is precisely why I added that check as a short term fix. I'll need to add in logic like this:

let record_count = block.num_records; // Read from block header
for _ in 0..record_count {
    if self.decoder.batch_is_full() {
        // Stop if the Arrow batch is full, but remember our progress
        // in this Avro block for the next call to `read()`.
        break;
    }
    
    let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
    self.block_cursor += consumed;
}

I just need to research the best way to go about this that solves all edge cases. I'm planning to follow-up on this work (before it's public) to both resolve this scenario and enable schema evolution. I just didn't want this PR to get too large and complicated is all, but I'm also happy to add the logic in here as well.

Also @tustvold not sure what you're original vision was for this reader. I wanted to make sure I keep true to that as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I don't have the capacity to review at the moment, or realistically for the foreseeable future, however, the major goal of the structure of the reader was to make it possible to read in a stream oriented fashion - a la the CSV or JSON decoders. This makes it possible to use with undelimited data streams from the likes of object_store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reader "detects" a zero byte record not by reading bytes from the data stream, but by interpreting the Avro schema it was given. The schema would look something like this: {"type": "record", "name": "EmptyRecord", "fields": []}.

@jecsand838 thanks a lot for the context. This almost sounds like a flaw in the avro spec, if there's no way to consume bytes after an empty record? I'd be curious how other implementations handle it?

Also: what happens if the reader provides a schema with fewer fields than the file's schema? Is the data layout self-describing enough that the decoder can reliably detect the extra bytes and either skip them or blow up?

Asking because (according to my not-super-trustworthy AI coding assistant) there seem to be several cases here:

  1. The reader asked for an empty schema, even tho the file contains data.
    • This should either blow up due to schema mismatch or (more likely) just return however many empty rows the block contained.
    • It is correct to ignore all bytes in the block, because we don't want the columns they encode
    • But wouldn't the decoder have anyway consumed them as part of handling extra/unwanted columns in general?
  2. The writer produced an empty schema.
    • This should return however many empty rows the block contains
    • The block should not contain any bytes, because there was nothing to encode
    • Seems like we should blow up if spurious bytes are present, unless the avro spec says otherwise?
  3. [AI assistant claim] Nulls occupy zero bytes, and so an all-null row would occupy zero bytes even if the schema is non-empty.
    • I question this one because there has to be some way of knowing the row (or its values) are all-NULL, and that information would have to come from the byte stream the decoder is consuming?

Copy link
Contributor Author

@jecsand838 jecsand838 Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich 100% and the Avro spec can definitely be tricky and nuanced.

if there's no way to consume bytes after an empty record? I'd be curious how other implementations handle it?

From my research the answer seems to be that no extra bytes exist after an “empty record” in Avro. It seems like what the code should do is simply count that it has seen one record and move on.

In the Java implementation GenericDatumReader.readRecord iterates over the field list; with zero fields the loop body never executes, so no calls are made and the input position stays where it is. The outer caller still decrements the blockRemaining counter, so the Decoder read logic progresses correctly.

what happens if the reader provides a schema with fewer fields than the file's schema?

If the schema used to write the data (in this instance the file's schema) cannot be resolved with the reader's schema per the avro specification's schema resolution requirements then we should return an error for every record written using that invalid schema imo. Handling the error would then be up to the caller. In the scenario you called out, the missing fields would be ignored and no error returned per the specification: if the writer’s record contains a field with a name not present in the reader’s record, the writer’s value for that field is ignored. (Remember that an Avro schema is just a Record type with the fields of each row defined as Avro Record Fields in the schema Record.)

Is the data layout self-describing enough that the decoder can reliably detect the extra bytes and either skip them or blow up?

It should be imo. From looking over the Java implementation it seems the writer schema either needs to be:

  1. Explicitly provided: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L122
  2. Provided in the first row (Header) of the stream: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L142

Or a MissingSchemaException is thrown: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L42

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Based on all that, it sounds like the current PR is mostly correct in its handling of empty records... with the future improvement to return the correct number of empty records before dropping the block? :shipit:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich That's correct.

Comment on lines +301 to +304
match self.schema {
Some(ref schema) => {
let record_decoder = self.make_record_decoder(schema)?;
Ok(Decoder::new(record_decoder, self.batch_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming from the parquet world, I'm a bit surprised the decoder doesn't need access to the header?
But looking at this code, the header only seems to provide schema and compression info?
And the decoder leaves compression to the reader for some reason?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, a bigger question -- the same reader provides the header and the data. Does the caller have to keep track of whether the header was already consumed?

  • If this method already consumed it, can they re-read it some other way later?
  • If this method did not consume it, will that confuse the block decoder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming from the parquet world, I'm a bit surprised the decoder doesn't need access to the header?
But looking at this code, the header only seems to provide schema and compression info?
And the decoder leaves compression to the reader for some reason?

You are correct. In the Avro container file format, the header contains the file schema and the compression codec used for the data blocks. The rest of the file is a series of blocks, each containing some number of records. The Reader is a high-level component that handles the file-level concerns: it reads and parses the header, and then it reads each data block, decompressing it if necessary.

The Decoder is a lower-level component that only knows how to decode a stream of uncompressed Avro record data. This separation of concerns makes the Decoder more flexible, for example, allowing it to be used in async contexts where data arrives in chunks, as shown in the documentation's decode_stream example.

Actually, a bigger question -- the same reader provides the header and the data. Does the caller have to keep track of whether the header was already consumed?

If this method already consumed it, can they re-read it some other way later?
If this method did not consume it, will that confuse the block decoder?

The behavior depends on whether you provide a schema:

  • If you provide a schema (the code snippet you commented on), build_decoder does not read from the input reader. It assumes the caller will handle the Avro container format themselves (i.e., skip the header and decompress blocks before passing the data to the Decoder). If you were to feed the header bytes to the Decoder created this way, it would indeed fail to parse them. This path is intended for advanced, low-level use cases.
  • If you do NOT provide a schema, build_decoder reads from the reader to parse the header and discover the schema. The header is consumed from the reader in a forward-only manner, so you cannot re-read it unless the underlying stream supports Seek. After this call, you would start feeding the subsequent data blocks to the returned Decoder.

So, the caller of build_decoder must be aware of whether the header has been consumed. For most situations, the higher-level Reader is the recommended API, as it automatically handles all this state management for you.

For follow-up work, we could also add more explicit support for Avro's schema evolution capabilities. Currently, the ReaderBuilder allows a user to provide a schema, which will be used for decoding. A more advanced implementation would take this user-provided schema as the "reader's schema" and use the schema from the file header as the "writer's schema". The decoder could then resolve the data according to Avro's schema evolution rules, safely handling things like field aliasing, additions, and reordering. This would significantly improve the crate's robustness in production environments where schemas often change over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the provided schema is taken at face value today, with no attempt to reconcile it against the file's actual schema? What happens if they disagree in matters large or small?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile -- it sounds like whoever creates a decoder must read the header first and provide the schema. Because otherwise they lose access to the compression codec, which readers are apparently responsible to deal with rather than decoders. That said, I never quite understood why the decoder's flush method doesn't handle compression?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point about keeping them separate for loose coupling and optimization reasons. It's just weird that it's so easy to "wedge" yourself by not providing an explicit schema, which consumes the header in order to infer one, which makes it impossible to determine the compression codec you should use without re-reading the header.

Put another way: It looks to me like the decoder shouldn't be in the business of reading headers, ever. Whoever creates the decoder should have already consumed the header (in order to learn compression codec and possibly the schema), and that caller is responsible to provide a schema they deem appropriate (which may or may not have come from the header). As a bonus, any schema reconciliation would also be the caller's responsibility, and the decoder can focus on its job of decoding blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could follow the example of the parquet reader builder. It has a new_with_metadata constructor that allows the caller to provide the parquet footer, and also has a metadata method that allows the caller to access the parquet footer (regardless of whether it was provided or read internally).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, reading an avro header seems a lot simpler than reading a parquet footer... and I don't think there's any (easy?) way to guess how big an avro file's header might be, so we'd have to consume it even if we don't care what it says. That pushes me toward the first (simpler) suggestion, that caller consumes the header and decoder just does what it's told.

Copy link
Contributor Author

@jecsand838 jecsand838 Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich

It's just weird that it's so easy to "wedge" yourself by not providing an explicit schema, which consumes the header in order to infer one, which makes it impossible to determine the compression codec you should use without re-reading the header.

When dealing with object container file format, i.e. your standard .avro files, the header has to be there and all records must match the schema of the header: https://avro.apache.org/docs/1.11.1/specification/#object-container-files. In the event with_schema is used AND we're reading from an object container file, the schema from with_schema would be the reader schema and the schema from the file's header would be the writer schema. In that event or more generally whenever we have different, but compatible per the specification, reader and writer schemas, part of decoding each row will involve mapping from the initial schema used to write the data to the format the data must be in to read into Arrow essentially.

To be clear, that logic is not currently in place because we aren't ready to implement schema resolution just yet, so some of the current code, such as:

        match self.schema {
            Some(ref schema) => {
                let record_decoder = self.make_record_decoder(schema)?;
                Ok(Decoder::new(record_decoder, self.batch_size))
            }
            None => {
                let (_, decoder) = self.build_impl(&mut reader)?;
                Ok(decoder)
            }
        }

is place holder logic.

Also I'll add that a compliant and mature Avro decoder is expected to handle schema resolution. It's one of the most powerful and fundamental features of the Avro format. It's certainly one of the biggest reason's I'm working on this contribution and not using another format. So we will definitely want it completed before marking the arrow-avro reader as public. I just need to finish and merge in the PRs for field metadata, "out of spec" impala support, and the remaining types, such as Duration first.

a new_with_metadata constructor that allows the caller to provide the parquet footer, and also has a metadata method that allows the caller to access the parquet footer (regardless of whether it was provided or read internally)

That's not a bad idea tbh. In scenarios where we are decoding streaming blocks (not files), maybe we can use a function like that to inform the writer's schema while the with_schema method still provides the reader's schema. From there maybe we can even support the caller changing the writer's schema as the streaming data comes in. Usually this scenario will arise when decoding rows coming in via protocol wire format: https://avro.apache.org/docs/1.11.1/specification/#protocol-wire-format. I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder imo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder

How would the caller do that, out of curiosity? A quick skim of the wire protocol didn't turn up anything obvious. Does the sender embed a new header+schema that the caller should notice and respond to as they see fit?

.unwrap();

remaining -= to_read;
fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a competent async reader implementation... any reason it shouldn't be part of the public API?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah -- because not everyone wants to use the futures crate

@jecsand838 jecsand838 force-pushed the avro-reader-implementation branch from 4934f3d to b9ec9a5 Compare July 8, 2025 15:51
@jecsand838
Copy link
Contributor Author

@alamb @scovich I edited this PR's description to include the five follow-up PRs needed to complete the initial arrow-avro Reader implementation. (Not including the Duration PR I just opened)

Let me know what you think and if this one is in an acceptable place to be approved.

Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good milestone! 🚢

Comment on lines +301 to +304
match self.schema {
Some(ref schema) => {
let record_decoder = self.make_record_decoder(schema)?;
Ok(Decoder::new(record_decoder, self.batch_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder

How would the caller do that, out of curiosity? A quick skim of the wire protocol didn't turn up anything obvious. Does the sender embed a new header+schema that the caller should notice and respond to as they see fit?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jecsand838 @scovich and @tustvold

In my mind this PR is good improvement and a step in the right direction.

I didn't quite follow all the nuance about schema resolution, etc but it seems like it is well in hand

Let me know if there are any concerns about merging this; Otherwise i'll do it over the next day or two

Thanks again

pub struct AvroField {
name: String,
data_type: AvroDataType,
pub(crate) data_type: AvroDataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this `pub(crate) needed? Not a big deal but it seems unecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch! I had changed it on accident while setting up the tests and forgot to change it back. Just pushed up the corrected code for this.

use crate::reader::block::{Block, BlockDecoder};
use crate::reader::header::{Header, HeaderDecoder};
use arrow_schema::ArrowError;
//! Avro reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is really nice 👍

pub fn try_new_with_options(
data_type: &AvroDataType,
options: ReadOptions,
use_utf8view: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was somewhat confused at first about this change as I thought it made the API harder to extend

Then I realized that RecordDecoder is not pub anymore

So maybe we could mark these methods and the struct as pub(crate) to be explicit that it is not re-exported outside

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another good call out. I just pushed up those changes marking RecordDecoder and it's methods as pub(crate).

…r` and its methods from `pub` to `pub(crate)`.
@jecsand838 jecsand838 force-pushed the avro-reader-implementation branch from e172da9 to ee4eee0 Compare July 10, 2025 18:24
jecsand838 and others added 3 commits July 10, 2025 16:43
Introduced `RecordDecoderBuilder` to enable more flexible `RecordDecoder` configuration, including support for `utf8_view` and `strict_mode`. Updated `try_new` to use the builder with default settings.
@jecsand838
Copy link
Contributor Author

jecsand838 commented Jul 10, 2025

@scovich

I don't think the decoder should support implicitly inferring a schema to be clear. It should be the responsibility of the caller to identify when the writer schema has changed and to call a method that updates the decoder

How would the caller do that, out of curiosity? A quick skim of the wire protocol didn't turn up anything obvious. Does the sender embed a new header+schema that the caller should notice and respond to as they see fit?

The caller can do this by being signaled to either read a new .avsc file, retrieve an updated schema definition from a schema registry, or attempt to decode an Avro header from the sender (which I think happens) using the Header decoder in arrow-avro/src/reader/header.rs upon detection (maybe by detecting the magic byte / something defined in the specifications) or after getting a ParseError or SchemaError returned from the Decoder. I'm personally most familiar with schema registry (Kafka) use cases.

We can definitely add utility functions to assist with this as needed and/or make the Reader and Decoder configurable to attempt to detect a new header definition record, but imo the responsibility should be on the caller since it's very use-case specific.

@jecsand838 jecsand838 force-pushed the avro-reader-implementation branch from f9ac498 to bf6e311 Compare July 10, 2025 22:30
@alamb
Copy link
Contributor

alamb commented Jul 11, 2025

🚀

@alamb alamb merged commit ba751bf into apache:main Jul 11, 2025
24 checks passed
@jecsand838 jecsand838 deleted the avro-reader-implementation branch July 11, 2025 19:21
alamb pushed a commit that referenced this pull request Jul 22, 2025
# Which issue does this PR close?

- Part of #4886

- Follow up to #7834

# Rationale for this change

The initial Avro reader implementation contained an under-developed and
temporary safeguard to prevent infinite loops when processing records
that consumed zero bytes from the input buffer.

When the `Decoder` reported that zero bytes were consumed, the `Reader`
would advance it's cursor to the end of the current data block. While
this successfully prevented an infinite loop, it had the critical side
effect of silently discarding any remaining data in that block, leading
to potential data loss.

This change enhances the decoding logic to handle these zero-byte values
correctly, ensuring that the `Reader` makes proper progress without
dropping data and without risking an infinite loop.

# What changes are included in this PR?

- **Refined Decoder Logic**: The `Decoder` has been updated to
accurately track and report the number of bytes consumed for all values,
including valid zero-length records like `null` or empty `bytes`. This
ensures the decoder always makes forward progress.
- **Removal of Data-Skipping Safeguard**: The logic in the `Reader` that
previously advanced to the end of a block on a zero-byte read has been
removed. The reader now relies on the decoder to report accurate
consumption and advances its cursor incrementally and safely.
- * New integration test using a temporary `zero_byte.avro` file created
via this python script:
https://gist.github.com/jecsand838/e57647d0d12853f3cf07c350a6a40395

# Are these changes tested?

Yes, a new `test_read_zero_byte_avro_file` test was added that reads the
new `zero_byte.avro` file and confirms the update.

# Are there any user-facing changes?

N/A

# Follow-Up PRs

1. PR to update `test_read_zero_byte_avro_file` once
apache/arrow-testing#109 is merged in.
alamb pushed a commit that referenced this pull request Aug 7, 2025
…Resolution (#8006)

# Which issue does this PR close?

- Part of #4886

- Follow up to #7834

# Rationale for this change

Apache Avro’s [single object
encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding)
prefixes every record with the marker `0xC3 0x01` followed by a `Rabin`
[schema fingerprint
](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints)
so that readers can identify the correct writer schema without carrying
the full definition in each message.
While the current `arrow‑avro` implementation can read container files,
it cannot ingest these framed messages or handle streams where the
writer schema changes over time.

The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin)
hashed fingerprint of the [parsed canonical form of a
schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas)
to look up the `Schema` from a local schema store or registry.

This PR introduces **`SchemaStore`** and **fingerprinting** to enable:

* **Zero‑copy schema identification** for decoding streaming Avro
messages published in single‑object format (i.e. Kafka, Pulsar, etc)
into Arrow.
* **Dynamic schema evolution** by laying the foundation to resolve
writer reader schema differences on the fly.
**NOTE:** Schema Resolution support in `Codec` and `RecordDecoder`
coming the next PR.

# What changes are included in this PR?

| Area | Highlights |
| ------------------- |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| **`reader/mod.rs`** | Decoder now detects the `C3 01` prefix, extracts
the fingerprint, looks up the writer schema in a `SchemaStore`, and
switches to an LRU cached `RecordDecoder` without interrupting
streaming; supports `static_store_mode` to skip the 2 byte peek for
high‑throughput fixed‑schema pipelines. |
| **`ReaderBuilder`** | New builder configuration methods:
`.with_writer_schema_store`, `.with_active_fingerprint`,
`.with_static_store_mode`, `.with_reader_schema`,
`.with_max_decoder_cache_size`, with rigorous validation to prevent
misconfiguration. |
| **Unit tests** | New tests covering fingerprint generation, store
registration/lookup, schema switching, unknown‑fingerprint errors, and
interaction with UTF8‑view decoding. |
| **Docs & Examples** | Extensive inline docs with examples on all new
public methods / structs. |

---

# Are these changes tested?

Yes.  New tests cover:

1. **Fingerprinting** against the canonical examples from the Avro spec
2. **`SchemaStore` behavior** deduplication, duplicate registration, and
lookup.
3. **Decoder fast‑path** with `static_store_mode=true`, ensuring the
prefix is treated as payload, the 2 byte peek is skipped, and no schema
switch is attempted.

# Are there any user-facing changes?

N/A

# Follow-Up PRs

1. Implement Schema Resolution Functionality in Codec and RecordDecoder
2. Add ID `Fingerprint` variant on `SchemaStore` for Confluent Schema
Registry compatibility
3. Improve arrow-avro errors + add more benchmarks & examples to prepare
for public release

---------

Co-authored-by: Ryan Johnson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants