From 781de807e47607f32edd26c2d0885e8818496692 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 27 May 2025 18:26:51 -0500 Subject: [PATCH 1/7] Add support for decoding Avro fixed-size binary data Introduced handling for Avro `fixed` type in the decoder, enabling proper initialization, decoding, and array construction. Added corresponding tests to validate decoding functionality and edge cases like empty arrays. Implemented `get_fixed` method in `AvroCursor` for precise byte reading. --- arrow-avro/src/codec.rs | 14 ++++---- arrow-avro/src/reader/cursor.rs | 12 +++++++ arrow-avro/src/reader/record.rs | 63 ++++++++++++++++++++++++++++++++- 3 files changed, 80 insertions(+), 9 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 70f162f1471d..29bcbc5344c5 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -405,18 +405,16 @@ fn make_data_type<'a>( codec: Codec::List(Arc::new(field)), }) } - ComplexType::Fixed(f) => { - let size = f.size.try_into().map_err(|e| { - ArrowError::ParseError(format!("Overflow converting size to i32: {e}")) - })?; + ComplexType::Fixed(fx) => { + let size = fx.size as i32; - let field = AvroDataType { + let fixed_dt = AvroDataType { nullability: None, - metadata: f.attributes.field_metadata(), + metadata: fx.attributes.field_metadata(), codec: Codec::Fixed(size), }; - resolver.register(f.name, namespace, field.clone()); - Ok(field) + resolver.register(fx.name, namespace, fixed_dt.clone()); + Ok(fixed_dt) } ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!( "Enum of {e:?} not currently supported" diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs index 4b6a5a4d65db..1b89ff86c38c 100644 --- a/arrow-avro/src/reader/cursor.rs +++ b/arrow-avro/src/reader/cursor.rs @@ -118,4 +118,16 @@ impl<'a> AvroCursor<'a> { self.buf = &self.buf[8..]; Ok(ret) } + + /// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`). + pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8], ArrowError> { + if self.buf.len() < n { + return Err(ArrowError::ParseError( + "Unexpected EOF reading fixed".to_string(), + )); + } + let ret = &self.buf[..n]; + self.buf = &self.buf[n..]; + Ok(ret) + } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 3466b064455f..b4d498eda2dc 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -122,6 +122,7 @@ enum Decoder { Vec, Box, ), + Fixed(i32, Vec), Nullable(Nullability, NullBufferBuilder, Box), } @@ -157,7 +158,7 @@ impl Decoder { Codec::TimestampMicros(is_utc) => { Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY)) } - Codec::Fixed(_) => return nyi("decoding fixed"), + Codec::Fixed(sz) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)), Codec::Interval => return nyi("decoding interval"), Codec::List(item) => { let decoder = Self::try_new(item)?; @@ -232,6 +233,9 @@ impl Decoder { moff.push_length(0); } Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"), + Self::Fixed(sz, accum) => { + accum.extend(std::iter::repeat(0u8).take(*sz as usize)); + } } } @@ -282,6 +286,10 @@ impl Decoder { false => e.append_null(), } } + Self::Fixed(sz, accum) => { + let fx = buf.get_fixed(*sz as usize)?; + accum.extend_from_slice(fx); + } } Ok(()) } @@ -383,6 +391,12 @@ impl Decoder { let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false); Arc::new(map_arr) } + Self::Fixed(sz, accum) => { + let b: Buffer = flush_values(accum).into(); + let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Arc::new(arr) + } }) } } @@ -542,6 +556,53 @@ mod tests { assert_eq!(map_arr.value_length(0), 0); } + #[test] + fn test_fixed_decoding() { + let avro_type = avro_from_codec(Codec::Fixed(3)); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + + // First fixed-size item + let data1 = [1u8, 2, 3]; + let mut cursor1 = AvroCursor::new(&data1); + decoder.decode(&mut cursor1).expect("Failed to decode data1"); + assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size"); + + // Second fixed-size item + let data2 = [4u8, 5, 6]; + let mut cursor2 = AvroCursor::new(&data2); + decoder.decode(&mut cursor2).expect("Failed to decode data2"); + assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size"); + + let array = decoder.flush(None).expect("Failed to flush decoder"); + + assert_eq!(array.len(), 2, "Array should contain two items"); + let fixed_size_binary_array = array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to FixedSizeBinaryArray"); + + assert_eq!(fixed_size_binary_array.value_length(), 3, "Fixed size of binary values should be 3"); + assert_eq!(fixed_size_binary_array.value(0), &[1, 2, 3], "First item mismatch"); + assert_eq!(fixed_size_binary_array.value(1), &[4, 5, 6], "Second item mismatch"); + } + + #[test] + fn test_fixed_decoding_empty() { + let avro_type = avro_from_codec(Codec::Fixed(5)); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + + let array = decoder.flush(None).expect("Failed to flush decoder for empty input"); + + assert_eq!(array.len(), 0, "Array should be empty"); + let fixed_size_binary_array = array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to FixedSizeBinaryArray for empty array"); + + assert_eq!(fixed_size_binary_array.value_length(), 5, "Fixed size of binary values should be 5 as per type"); + } + + #[test] fn test_array_decoding() { let item_dt = avro_from_codec(Codec::Int32); From dfb0222d5d8b6e94fd8e82f589c399e58ed7692f Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Wed, 28 May 2025 09:47:47 -0500 Subject: [PATCH 2/7] Remove redundant comments in test_fixed_decoding function --- arrow-avro/src/reader/record.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index b4d498eda2dc..3b61ec086289 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -560,14 +560,12 @@ mod tests { fn test_fixed_decoding() { let avro_type = avro_from_codec(Codec::Fixed(3)); let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); - - // First fixed-size item + let data1 = [1u8, 2, 3]; let mut cursor1 = AvroCursor::new(&data1); decoder.decode(&mut cursor1).expect("Failed to decode data1"); assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size"); - - // Second fixed-size item + let data2 = [4u8, 5, 6]; let mut cursor2 = AvroCursor::new(&data2); decoder.decode(&mut cursor2).expect("Failed to decode data2"); From e94e4889840281749e3c3495c37f82e1fe9abc34 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Wed, 28 May 2025 10:16:34 -0500 Subject: [PATCH 3/7] Refactor Fixed type handling to use `try_into` for size conversion and minimize diffs with earlier iteration --- arrow-avro/src/codec.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 29bcbc5344c5..70f162f1471d 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -405,16 +405,18 @@ fn make_data_type<'a>( codec: Codec::List(Arc::new(field)), }) } - ComplexType::Fixed(fx) => { - let size = fx.size as i32; + ComplexType::Fixed(f) => { + let size = f.size.try_into().map_err(|e| { + ArrowError::ParseError(format!("Overflow converting size to i32: {e}")) + })?; - let fixed_dt = AvroDataType { + let field = AvroDataType { nullability: None, - metadata: fx.attributes.field_metadata(), + metadata: f.attributes.field_metadata(), codec: Codec::Fixed(size), }; - resolver.register(fx.name, namespace, fixed_dt.clone()); - Ok(fixed_dt) + resolver.register(f.name, namespace, field.clone()); + Ok(field) } ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!( "Enum of {e:?} not currently supported" From 68639083960389716e7d8e48d2a9bb3692c381f4 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Wed, 28 May 2025 15:49:39 -0500 Subject: [PATCH 4/7] Add support for Avro Uuid logical type Introduce a new `Codec::Uuid` variant to handle Avro Uuid logical types, mapping it to a `FixedSizeBinary` of length 16 in Arrow. This includes updates to codec handling, relevant match branches, and a unit test to validate functionality. --- arrow-avro/src/codec.rs | 15 +++++++++++++++ arrow-avro/src/reader/record.rs | 1 + 2 files changed, 16 insertions(+) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 70f162f1471d..f9aa546d2bd0 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -192,6 +192,8 @@ pub enum Codec { /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type /// The i32 parameter indicates the fixed binary size Fixed(i32), + /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16 + Uuid, /// Represents Avro array type, maps to Arrow's List data type List(Arc), /// Represents Avro record type, maps to Arrow's Struct data type @@ -225,6 +227,7 @@ impl Codec { } Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano), Self::Fixed(size) => DataType::FixedSizeBinary(*size), + Self::Uuid => DataType::FixedSizeBinary(16), Self::List(f) => { DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME))) } @@ -457,6 +460,7 @@ fn make_data_type<'a>( *c = Codec::TimestampMicros(false) } (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval, + (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid, (Some(logical), _) => { // Insert unrecognized logical type into metadata map field.metadata.insert("logicalType".into(), logical.into()); @@ -582,6 +586,17 @@ mod tests { assert!(matches!(result.codec, Codec::TimestampMicros(false))); } + + #[test] + fn test_uuid_type() { + let mut codec = Codec::Fixed(16); + + if let c @ Codec::Fixed(16) = &mut codec { + *c = Codec::Uuid; + } + + assert!(matches!(codec, Codec::Uuid)); + } #[test] fn test_duration_logical_type() { diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 3b61ec086289..d037cf808488 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -197,6 +197,7 @@ impl Decoder { Box::new(val_dec), ) } + Codec::Uuid => Self::Fixed(16, Vec::with_capacity(DEFAULT_CAPACITY)), }; Ok(match data_type.nullability() { From 23cc74dbdae7ad3ccb769e4078cc622150154617 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 17 Jun 2025 16:27:49 -0500 Subject: [PATCH 5/7] Refactor test formatting and assertions for clarity and consistency --- arrow-avro/src/codec.rs | 6 ++--- arrow-avro/src/reader/record.rs | 42 +++++++++++++++++++++++++-------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index f9aa546d2bd0..caac390f3d07 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -586,15 +586,15 @@ mod tests { assert!(matches!(result.codec, Codec::TimestampMicros(false))); } - + #[test] fn test_uuid_type() { let mut codec = Codec::Fixed(16); - + if let c @ Codec::Fixed(16) = &mut codec { *c = Codec::Uuid; } - + assert!(matches!(codec, Codec::Uuid)); } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index d037cf808488..d247264dba82 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -561,15 +561,19 @@ mod tests { fn test_fixed_decoding() { let avro_type = avro_from_codec(Codec::Fixed(3)); let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); - + let data1 = [1u8, 2, 3]; let mut cursor1 = AvroCursor::new(&data1); - decoder.decode(&mut cursor1).expect("Failed to decode data1"); + decoder + .decode(&mut cursor1) + .expect("Failed to decode data1"); assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size"); - + let data2 = [4u8, 5, 6]; let mut cursor2 = AvroCursor::new(&data2); - decoder.decode(&mut cursor2).expect("Failed to decode data2"); + decoder + .decode(&mut cursor2) + .expect("Failed to decode data2"); assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size"); let array = decoder.flush(None).expect("Failed to flush decoder"); @@ -580,9 +584,21 @@ mod tests { .downcast_ref::() .expect("Failed to downcast to FixedSizeBinaryArray"); - assert_eq!(fixed_size_binary_array.value_length(), 3, "Fixed size of binary values should be 3"); - assert_eq!(fixed_size_binary_array.value(0), &[1, 2, 3], "First item mismatch"); - assert_eq!(fixed_size_binary_array.value(1), &[4, 5, 6], "Second item mismatch"); + assert_eq!( + fixed_size_binary_array.value_length(), + 3, + "Fixed size of binary values should be 3" + ); + assert_eq!( + fixed_size_binary_array.value(0), + &[1, 2, 3], + "First item mismatch" + ); + assert_eq!( + fixed_size_binary_array.value(1), + &[4, 5, 6], + "Second item mismatch" + ); } #[test] @@ -590,15 +606,21 @@ mod tests { let avro_type = avro_from_codec(Codec::Fixed(5)); let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); - let array = decoder.flush(None).expect("Failed to flush decoder for empty input"); + let array = decoder + .flush(None) + .expect("Failed to flush decoder for empty input"); assert_eq!(array.len(), 0, "Array should be empty"); let fixed_size_binary_array = array .as_any() .downcast_ref::() .expect("Failed to downcast to FixedSizeBinaryArray for empty array"); - - assert_eq!(fixed_size_binary_array.value_length(), 5, "Fixed size of binary values should be 5 as per type"); + + assert_eq!( + fixed_size_binary_array.value_length(), + 5, + "Fixed size of binary values should be 5 as per type" + ); } From 91f8442d794d98d2cca319d341493597fb444ab2 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Fri, 20 Jun 2025 11:26:11 -0500 Subject: [PATCH 6/7] Add test for Uuid decoding in Avro reader --- arrow-avro/src/reader/record.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index d247264dba82..c71e7bbf50be 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -622,7 +622,19 @@ mod tests { "Fixed size of binary values should be 5 as per type" ); } - + + #[test] + fn test_uuid_decoding() { + let avro_type = avro_from_codec(Codec::Uuid); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + + let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; + let mut cursor1 = AvroCursor::new(&data1); + decoder + .decode(&mut cursor1) + .expect("Failed to decode data1"); + assert_eq!(cursor1.position(), 16, "Cursor should advance by fixed size"); + } #[test] fn test_array_decoding() { From e605642a34a4d9537b8e85d597001f72d52e480a Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Fri, 27 Jun 2025 14:29:49 -0500 Subject: [PATCH 7/7] Improve formatting in `test_uuid_decoding` for readability --- arrow-avro/src/reader/record.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index c71e7bbf50be..6d1a9f751ace 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -622,18 +622,22 @@ mod tests { "Fixed size of binary values should be 5 as per type" ); } - + #[test] fn test_uuid_decoding() { let avro_type = avro_from_codec(Codec::Uuid); let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); - + let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; let mut cursor1 = AvroCursor::new(&data1); decoder .decode(&mut cursor1) .expect("Failed to decode data1"); - assert_eq!(cursor1.position(), 16, "Cursor should advance by fixed size"); + assert_eq!( + cursor1.position(), + 16, + "Cursor should advance by fixed size" + ); } #[test]