diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 70f162f1471d..caac390f3d07 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -192,6 +192,8 @@ pub enum Codec { /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type /// The i32 parameter indicates the fixed binary size Fixed(i32), + /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16 + Uuid, /// Represents Avro array type, maps to Arrow's List data type List(Arc), /// Represents Avro record type, maps to Arrow's Struct data type @@ -225,6 +227,7 @@ impl Codec { } Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano), Self::Fixed(size) => DataType::FixedSizeBinary(*size), + Self::Uuid => DataType::FixedSizeBinary(16), Self::List(f) => { DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME))) } @@ -457,6 +460,7 @@ fn make_data_type<'a>( *c = Codec::TimestampMicros(false) } (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval, + (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid, (Some(logical), _) => { // Insert unrecognized logical type into metadata map field.metadata.insert("logicalType".into(), logical.into()); @@ -583,6 +587,17 @@ mod tests { assert!(matches!(result.codec, Codec::TimestampMicros(false))); } + #[test] + fn test_uuid_type() { + let mut codec = Codec::Fixed(16); + + if let c @ Codec::Fixed(16) = &mut codec { + *c = Codec::Uuid; + } + + assert!(matches!(codec, Codec::Uuid)); + } + #[test] fn test_duration_logical_type() { let mut codec = Codec::Fixed(12); diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs index 4b6a5a4d65db..1b89ff86c38c 100644 --- a/arrow-avro/src/reader/cursor.rs +++ b/arrow-avro/src/reader/cursor.rs @@ -118,4 +118,16 @@ impl<'a> AvroCursor<'a> { self.buf = &self.buf[8..]; Ok(ret) } + + /// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`). + pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8], ArrowError> { + if self.buf.len() < n { + return Err(ArrowError::ParseError( + "Unexpected EOF reading fixed".to_string(), + )); + } + let ret = &self.buf[..n]; + self.buf = &self.buf[n..]; + Ok(ret) + } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 3466b064455f..6d1a9f751ace 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -122,6 +122,7 @@ enum Decoder { Vec, Box, ), + Fixed(i32, Vec), Nullable(Nullability, NullBufferBuilder, Box), } @@ -157,7 +158,7 @@ impl Decoder { Codec::TimestampMicros(is_utc) => { Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY)) } - Codec::Fixed(_) => return nyi("decoding fixed"), + Codec::Fixed(sz) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)), Codec::Interval => return nyi("decoding interval"), Codec::List(item) => { let decoder = Self::try_new(item)?; @@ -196,6 +197,7 @@ impl Decoder { Box::new(val_dec), ) } + Codec::Uuid => Self::Fixed(16, Vec::with_capacity(DEFAULT_CAPACITY)), }; Ok(match data_type.nullability() { @@ -232,6 +234,9 @@ impl Decoder { moff.push_length(0); } Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"), + Self::Fixed(sz, accum) => { + accum.extend(std::iter::repeat(0u8).take(*sz as usize)); + } } } @@ -282,6 +287,10 @@ impl Decoder { false => e.append_null(), } } + Self::Fixed(sz, accum) => { + let fx = buf.get_fixed(*sz as usize)?; + accum.extend_from_slice(fx); + } } Ok(()) } @@ -383,6 +392,12 @@ impl Decoder { let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false); Arc::new(map_arr) } + Self::Fixed(sz, accum) => { + let b: Buffer = flush_values(accum).into(); + let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Arc::new(arr) + } }) } } @@ -542,6 +557,89 @@ mod tests { assert_eq!(map_arr.value_length(0), 0); } + #[test] + fn test_fixed_decoding() { + let avro_type = avro_from_codec(Codec::Fixed(3)); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + + let data1 = [1u8, 2, 3]; + let mut cursor1 = AvroCursor::new(&data1); + decoder + .decode(&mut cursor1) + .expect("Failed to decode data1"); + assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size"); + + let data2 = [4u8, 5, 6]; + let mut cursor2 = AvroCursor::new(&data2); + decoder + .decode(&mut cursor2) + .expect("Failed to decode data2"); + assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size"); + + let array = decoder.flush(None).expect("Failed to flush decoder"); + + assert_eq!(array.len(), 2, "Array should contain two items"); + let fixed_size_binary_array = array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to FixedSizeBinaryArray"); + + assert_eq!( + fixed_size_binary_array.value_length(), + 3, + "Fixed size of binary values should be 3" + ); + assert_eq!( + fixed_size_binary_array.value(0), + &[1, 2, 3], + "First item mismatch" + ); + assert_eq!( + fixed_size_binary_array.value(1), + &[4, 5, 6], + "Second item mismatch" + ); + } + + #[test] + fn test_fixed_decoding_empty() { + let avro_type = avro_from_codec(Codec::Fixed(5)); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + + let array = decoder + .flush(None) + .expect("Failed to flush decoder for empty input"); + + assert_eq!(array.len(), 0, "Array should be empty"); + let fixed_size_binary_array = array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to FixedSizeBinaryArray for empty array"); + + assert_eq!( + fixed_size_binary_array.value_length(), + 5, + "Fixed size of binary values should be 5 as per type" + ); + } + + #[test] + fn test_uuid_decoding() { + let avro_type = avro_from_codec(Codec::Uuid); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + + let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; + let mut cursor1 = AvroCursor::new(&data1); + decoder + .decode(&mut cursor1) + .expect("Failed to decode data1"); + assert_eq!( + cursor1.position(), + 16, + "Cursor should advance by fixed size" + ); + } + #[test] fn test_array_decoding() { let item_dt = avro_from_codec(Codec::Int32);