Skip to content
15 changes: 15 additions & 0 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ pub enum Codec {
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rfc link, it says that A UUID is 128 bits long, and requires no central registration process, does the length 16 here suffice for that?

for the uuid crate, the length for the example 67e55044-10b1-426f-9247-bb680e5fe0c8 is 36

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should; 16 bytes == 128 bits. It looks to me like the UUID crate specifies 16 & 128 as well. Do you mean character count? That's independent of byte size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks

Uuid,
/// Represents Avro array type, maps to Arrow's List data type
List(Arc<AvroDataType>),
/// Represents Avro record type, maps to Arrow's Struct data type
Expand Down Expand Up @@ -225,6 +227,7 @@ impl Codec {
}
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
Self::Uuid => DataType::FixedSizeBinary(16),
Self::List(f) => {
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
}
Expand Down Expand Up @@ -457,6 +460,7 @@ fn make_data_type<'a>(
*c = Codec::TimestampMicros(false)
}
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
(Some(logical), _) => {
// Insert unrecognized logical type into metadata map
field.metadata.insert("logicalType".into(), logical.into());
Expand Down Expand Up @@ -583,6 +587,17 @@ mod tests {
assert!(matches!(result.codec, Codec::TimestampMicros(false)));
}

#[test]
fn test_uuid_type() {
let mut codec = Codec::Fixed(16);

if let c @ Codec::Fixed(16) = &mut codec {
*c = Codec::Uuid;
}

assert!(matches!(codec, Codec::Uuid));
}

#[test]
fn test_duration_logical_type() {
let mut codec = Codec::Fixed(12);
Expand Down
12 changes: 12 additions & 0 deletions arrow-avro/src/reader/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,16 @@ impl<'a> AvroCursor<'a> {
self.buf = &self.buf[8..];
Ok(ret)
}

/// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`).
pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8], ArrowError> {
if self.buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected EOF reading fixed".to_string(),
));
}
let ret = &self.buf[..n];
self.buf = &self.buf[n..];
Ok(ret)
}
}
100 changes: 99 additions & 1 deletion arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ enum Decoder {
Vec<u8>,
Box<Decoder>,
),
Fixed(i32, Vec<u8>),
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
}

Expand Down Expand Up @@ -157,7 +158,7 @@ impl Decoder {
Codec::TimestampMicros(is_utc) => {
Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
}
Codec::Fixed(_) => return nyi("decoding fixed"),
Codec::Fixed(sz) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::Interval => return nyi("decoding interval"),
Codec::List(item) => {
let decoder = Self::try_new(item)?;
Expand Down Expand Up @@ -196,6 +197,7 @@ impl Decoder {
Box::new(val_dec),
)
}
Codec::Uuid => Self::Fixed(16, Vec::with_capacity(DEFAULT_CAPACITY)),
};

Ok(match data_type.nullability() {
Expand Down Expand Up @@ -232,6 +234,9 @@ impl Decoder {
moff.push_length(0);
}
Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
Self::Fixed(sz, accum) => {
accum.extend(std::iter::repeat(0u8).take(*sz as usize));
}
}
}

Expand Down Expand Up @@ -282,6 +287,10 @@ impl Decoder {
false => e.append_null(),
}
}
Self::Fixed(sz, accum) => {
let fx = buf.get_fixed(*sz as usize)?;
accum.extend_from_slice(fx);
}
}
Ok(())
}
Expand Down Expand Up @@ -383,6 +392,12 @@ impl Decoder {
let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false);
Arc::new(map_arr)
}
Self::Fixed(sz, accum) => {
let b: Buffer = flush_values(accum).into();
let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
Arc::new(arr)
}
})
}
}
Expand Down Expand Up @@ -542,6 +557,89 @@ mod tests {
assert_eq!(map_arr.value_length(0), 0);
}

#[test]
fn test_fixed_decoding() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for these tests. Do we need to add a test for the UUID type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unnecessary to me, as it would be nearly identical to fixed but with a hard-coded length. I can add one though, if necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure about this; we could involve a maintainer for a double check.
just noticed that we already have tests for other types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a test explicitly for reading uuids is important (maybe it is covered by end to end coverage somewhere else however)

I think it is important to ensure we don't accidentally break UUID handling in the future\

Perhaps we can add one in a follow on PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb We don't have an E2E test covering UUID. I'll be sure to create an Avro UUID test file (unless there's already one you'd want us to use) and include an E2E UUID test along with the other E2E tests in our second to last PR. Our final PR would then contain the completed arrow-avro implementations of the ReaderBuilder and Reader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unless there's already one you'd want us to use)

Not that i know of -- but I am not familar with apache avro -- if there is some sort of standardized test corpus it would be great to use that

I took a look around quickly but couldn't find anything like https://github.com/apache/parquet-testing for avro 🤔

let avro_type = avro_from_codec(Codec::Fixed(3));
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");

let data1 = [1u8, 2, 3];
let mut cursor1 = AvroCursor::new(&data1);
decoder
.decode(&mut cursor1)
.expect("Failed to decode data1");
assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");

let data2 = [4u8, 5, 6];
let mut cursor2 = AvroCursor::new(&data2);
decoder
.decode(&mut cursor2)
.expect("Failed to decode data2");
assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");

let array = decoder.flush(None).expect("Failed to flush decoder");

assert_eq!(array.len(), 2, "Array should contain two items");
let fixed_size_binary_array = array
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.expect("Failed to downcast to FixedSizeBinaryArray");

assert_eq!(
fixed_size_binary_array.value_length(),
3,
"Fixed size of binary values should be 3"
);
assert_eq!(
fixed_size_binary_array.value(0),
&[1, 2, 3],
"First item mismatch"
);
assert_eq!(
fixed_size_binary_array.value(1),
&[4, 5, 6],
"Second item mismatch"
);
}

#[test]
fn test_fixed_decoding_empty() {
let avro_type = avro_from_codec(Codec::Fixed(5));
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");

let array = decoder
.flush(None)
.expect("Failed to flush decoder for empty input");

assert_eq!(array.len(), 0, "Array should be empty");
let fixed_size_binary_array = array
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.expect("Failed to downcast to FixedSizeBinaryArray for empty array");

assert_eq!(
fixed_size_binary_array.value_length(),
5,
"Fixed size of binary values should be 5 as per type"
);
}

#[test]
fn test_uuid_decoding() {
let avro_type = avro_from_codec(Codec::Uuid);
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");

let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
let mut cursor1 = AvroCursor::new(&data1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to extend this test, perhaps in a follow on PR, to ensure the UUID was read correctly

decoder
.decode(&mut cursor1)
.expect("Failed to decode data1");
assert_eq!(
cursor1.position(),
16,
"Cursor should advance by fixed size"
);
}

#[test]
fn test_array_decoding() {
let item_dt = avro_from_codec(Codec::Int32);
Expand Down
Loading