diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 8897061aa7da..46ec76be14fb 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -39,6 +39,7 @@ all-features = true default = ["deflate", "snappy", "zstd", "bzip2", "xz"] deflate = ["flate2"] snappy = ["snap", "crc"] +canonical_extension_types = ["arrow-schema/canonical_extension_types"] [dependencies] arrow-schema = { workspace = true } @@ -52,6 +53,7 @@ zstd = { version = "0.13", default-features = false, optional = true } bzip2 = { version = "0.4.4", default-features = false, optional = true } xz = { version = "0.1", default-features = false, optional = true } crc = { version = "3.0", optional = true } +uuid = "1.17" [dev-dependencies] rand = { version = "0.9.1", default-features = false, features = ["std", "std_rng", "thread_rng"] } diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 399037fdf9f7..88b30a6d49b4 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -37,6 +37,14 @@ pub enum Nullability { NullSecond, } +#[cfg(feature = "canonical_extension_types")] +fn with_extension_type(codec: &Codec, field: Field) -> Field { + match codec { + Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid), + _ => field, + } +} + /// An Avro datatype mapped to the arrow data model #[derive(Debug, Clone)] pub struct AvroDataType { @@ -61,8 +69,13 @@ impl AvroDataType { /// Returns an arrow [`Field`] with the given name pub fn field_with_name(&self, name: &str) -> Field { - let d = self.codec.data_type(); - Field::new(name, d, self.nullability.is_some()).with_metadata(self.metadata.clone()) + let nullable = self.nullability.is_some(); + let data_type = self.codec.data_type(); + let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone()); + #[cfg(feature = "canonical_extension_types")] + return with_extension_type(&self.codec, field); + #[cfg(not(feature = "canonical_extension_types"))] + field } /// Returns a reference to the codec used by this data type @@ -200,7 +213,7 @@ pub enum Codec { /// - `scale` (`Option`): Number of fractional digits. /// - `fixed_size` (`Option`): Size in bytes if backed by a `fixed` type, otherwise `None`. Decimal(usize, Option, Option), - /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16 + /// Represents Avro Uuid type, a FixedSizeBinary with a length of 16. Uuid, /// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type. /// @@ -479,6 +492,18 @@ fn make_data_type<'a>( codec: Codec::Decimal(precision, Some(scale), Some(size as usize)), } } + Some("duration") => { + if size != 12 { + return Err(ArrowError::ParseError(format!( + "Invalid fixed size for Duration: {size}, must be 12" + ))); + }; + AvroDataType { + nullability: None, + metadata: md, + codec: Codec::Interval, + } + } _ => AvroDataType { nullability: None, metadata: md, @@ -543,7 +568,6 @@ fn make_data_type<'a>( (Some("local-timestamp-micros"), c @ Codec::Int64) => { *c = Codec::TimestampMicros(false) } - (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval, (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid, (Some(logical), _) => { // Insert unrecognized logical type into metadata map diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 0c33f9f2d798..5059e41ff0a3 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -397,9 +397,9 @@ mod test { use crate::reader::vlq::VLQDecoder; use crate::reader::{read_header, Decoder, ReaderBuilder}; use crate::test_util::arrow_test_data; - use arrow_array::types::Int32Type; + use arrow_array::types::{Int32Type, IntervalMonthDayNanoType}; use arrow_array::*; - use arrow_schema::{ArrowError, DataType, Field, Schema}; + use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema}; use bytes::{Buf, BufMut, Bytes}; use futures::executor::block_on; use futures::{stream, Stream, StreamExt, TryStreamExt}; @@ -796,4 +796,65 @@ mod test { assert_eq!(actual2, expected); } } + + #[test] + fn test_duration_uuid() { + let batch = read_file("test/data/duration_uuid.avro", 4, false); + let schema = batch.schema(); + let fields = schema.fields(); + assert_eq!(fields.len(), 2); + assert_eq!(fields[0].name(), "duration_field"); + assert_eq!( + fields[0].data_type(), + &DataType::Interval(IntervalUnit::MonthDayNano) + ); + assert_eq!(fields[1].name(), "uuid_field"); + assert_eq!(fields[1].data_type(), &DataType::FixedSizeBinary(16)); + assert_eq!(batch.num_rows(), 4); + assert_eq!(batch.num_columns(), 2); + let duration_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let expected_duration_array: IntervalMonthDayNanoArray = [ + Some(IntervalMonthDayNanoType::make_value(1, 15, 500_000_000)), + Some(IntervalMonthDayNanoType::make_value(0, 5, 2_500_000_000)), + Some(IntervalMonthDayNanoType::make_value(2, 0, 0)), + Some(IntervalMonthDayNanoType::make_value(12, 31, 999_000_000)), + ] + .iter() + .copied() + .collect(); + assert_eq!(&expected_duration_array, duration_array); + let uuid_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let expected_uuid_array = FixedSizeBinaryArray::try_from_sparse_iter_with_size( + [ + Some([ + 0xfe, 0x7b, 0xc3, 0x0b, 0x4c, 0xe8, 0x4c, 0x5e, 0xb6, 0x7c, 0x22, 0x34, 0xa2, + 0xd3, 0x8e, 0x66, + ]), + Some([ + 0xb3, 0x3f, 0x2a, 0xd7, 0x97, 0xb4, 0x4d, 0xe1, 0x8b, 0xfe, 0x94, 0x94, 0x1d, + 0x60, 0x15, 0x6e, + ]), + Some([ + 0x5f, 0x74, 0x92, 0x64, 0x07, 0x4b, 0x40, 0x05, 0x84, 0xbf, 0x11, 0x5e, 0xa8, + 0x4e, 0xd2, 0x0a, + ]), + Some([ + 0x08, 0x26, 0xcc, 0x06, 0xd2, 0xe3, 0x45, 0x99, 0xb4, 0xad, 0xaf, 0x5f, 0xa6, + 0x90, 0x5c, 0xdb, + ]), + ] + .into_iter(), + 16, + ) + .unwrap(); + assert_eq!(&expected_uuid_array, uuid_array); + } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 972a416a6a51..0a4d47ad24e0 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -20,18 +20,22 @@ use crate::reader::block::{Block, BlockDecoder}; use crate::reader::cursor::AvroCursor; use crate::reader::header::Header; use crate::schema::*; -use arrow_array::builder::{Decimal128Builder, Decimal256Builder}; +use arrow_array::builder::{ + ArrayBuilder, Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder, + PrimitiveBuilder, +}; use arrow_array::types::*; use arrow_array::*; use arrow_buffer::*; use arrow_schema::{ - ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as ArrowSchema, SchemaRef, - DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, + ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit, + Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; use std::cmp::Ordering; use std::collections::HashMap; use std::io::Read; use std::sync::Arc; +use uuid::Uuid; const DEFAULT_CAPACITY: usize = 1024; @@ -177,6 +181,8 @@ enum Decoder { ), Fixed(i32, Vec), Enum(Vec, Arc<[String]>), + Duration(IntervalMonthDayNanoBuilder), + Uuid(Vec), Decimal128(usize, Option, Option, Decimal128Builder), Decimal256(usize, Option, Option, Decimal256Builder), Nullable(Nullability, NullBufferBuilder, Box), @@ -184,8 +190,6 @@ enum Decoder { impl Decoder { fn try_new(data_type: &AvroDataType) -> Result { - let nyi = |s: &str| Err(ArrowError::NotYetImplemented(s.to_string())); - let decoder = match data_type.codec() { Codec::Null => Self::Null(0), Codec::Boolean => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)), @@ -254,7 +258,7 @@ impl Decoder { } } } - Codec::Interval => return nyi("decoding interval"), + Codec::Interval => Self::Duration(IntervalMonthDayNanoBuilder::new()), Codec::List(item) => { let decoder = Self::try_new(item)?; Self::Array( @@ -295,7 +299,7 @@ impl Decoder { Box::new(val_dec), ) } - Codec::Uuid => Self::Fixed(16, Vec::with_capacity(DEFAULT_CAPACITY)), + Codec::Uuid => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)), }; Ok(match data_type.nullability() { Some(nullability) => Self::Nullable( @@ -322,6 +326,9 @@ impl Decoder { Self::Binary(offsets, _) | Self::String(offsets, _) | Self::StringView(offsets, _) => { offsets.push_length(0); } + Self::Uuid(v) => { + v.extend([0; 16]); + } Self::Array(_, offsets, e) => { offsets.push_length(0); e.append_null(); @@ -336,6 +343,7 @@ impl Decoder { Self::Decimal128(_, _, _, builder) => builder.append_value(0), Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO), Self::Enum(indices, _) => indices.push(0), + Self::Duration(builder) => builder.append_null(), Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"), } } @@ -361,6 +369,15 @@ impl Decoder { offsets.push_length(data.len()); values.extend_from_slice(data); } + Self::Uuid(values) => { + let s_bytes = buf.get_bytes()?; + let s = std::str::from_utf8(s_bytes).map_err(|e| { + ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}")) + })?; + let uuid = Uuid::try_parse(s) + .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?; + values.extend_from_slice(uuid.as_bytes()); + } Self::Array(_, off, encoding) => { let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?; off.push_length(total_items); @@ -406,6 +423,14 @@ impl Decoder { Self::Enum(indices, _) => { indices.push(buf.get_int()?); } + Self::Duration(builder) => { + let b = buf.get_fixed(12)?; + let months = u32::from_le_bytes(b[0..4].try_into().unwrap()); + let days = u32::from_le_bytes(b[4..8].try_into().unwrap()); + let millis = u32::from_le_bytes(b[8..12].try_into().unwrap()); + let nanos = (millis as i64) * 1_000_000; + builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos)); + } Self::Nullable(nullability, nulls, e) => { let is_valid = buf.get_bool()? == matches!(nullability, Nullability::NullFirst); nulls.append(is_valid); @@ -466,7 +491,6 @@ impl Decoder { } }) .collect(); - Arc::new(StringViewArray::from(values)) } Self::Array(field, offsets, values) => { @@ -520,9 +544,13 @@ impl Decoder { .map_err(|e| ArrowError::ParseError(e.to_string()))?; Arc::new(arr) } + Self::Uuid(values) => { + let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Arc::new(arr) + } Self::Decimal128(precision, scale, _, builder) => { - let mut b = std::mem::take(builder); - let (_, vals, _) = b.finish().into_parts(); + let (_, vals, _) = builder.finish().into_parts(); let scl = scale.unwrap_or(0); let dec = Decimal128Array::new(vals, nulls) .with_precision_and_scale(*precision as u8, scl as i8) @@ -530,8 +558,7 @@ impl Decoder { Arc::new(dec) } Self::Decimal256(precision, scale, _, builder) => { - let mut b = std::mem::take(builder); - let (_, vals, _) = b.finish().into_parts(); + let (_, vals, _) = builder.finish().into_parts(); let scl = scale.unwrap_or(0); let dec = Decimal256Array::new(vals, nulls) .with_precision_and_scale(*precision as u8, scl as i8) @@ -545,10 +572,17 @@ impl Decoder { )); Arc::new(DictionaryArray::try_new(keys, values)?) } + Self::Duration(builder) => { + let (_, vals, _) = builder.finish().into_parts(); + let vals = IntervalMonthDayNanoArray::try_new(vals, nulls) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Arc::new(vals) + } }) } } +#[inline] fn read_blocks( buf: &mut AvroCursor, decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, @@ -556,6 +590,7 @@ fn read_blocks( read_blockwise_items(buf, true, decode_entry) } +#[inline] fn read_blockwise_items( buf: &mut AvroCursor, read_size_after_negative: bool, @@ -793,17 +828,27 @@ mod tests { fn test_uuid_decoding() { let avro_type = avro_from_codec(Codec::Uuid); let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); - - let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; - let mut cursor1 = AvroCursor::new(&data1); - decoder - .decode(&mut cursor1) - .expect("Failed to decode data1"); + let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6"; + let data = encode_avro_bytes(uuid_str.as_bytes()); + let mut cursor = AvroCursor::new(&data); + decoder.decode(&mut cursor).expect("Failed to decode data"); assert_eq!( - cursor1.position(), - 16, - "Cursor should advance by fixed size" + cursor.position(), + data.len(), + "Cursor should advance by varint size + data size" ); + let array = decoder.flush(None).expect("Failed to flush decoder"); + let fixed_size_binary_array = array + .as_any() + .downcast_ref::() + .expect("Array should be a FixedSizeBinaryArray"); + assert_eq!(fixed_size_binary_array.len(), 1); + assert_eq!(fixed_size_binary_array.value_length(), 16); + let expected_bytes = [ + 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e, + 0x6b, 0xf6, + ]; + assert_eq!(fixed_size_binary_array.value(0), &expected_bytes); } #[test] @@ -1084,4 +1129,67 @@ mod tests { assert_eq!(values.value(0), "X"); assert_eq!(values.value(1), "Y"); } + + #[test] + fn test_duration_decoding_with_nulls() { + let duration_codec = Codec::Interval; + let avro_type = AvroDataType::new( + duration_codec, + Default::default(), + Some(Nullability::NullFirst), + ); + let mut decoder = Decoder::try_new(&avro_type).unwrap(); + let mut data = Vec::new(); + // First value: 1 month, 2 days, 3 millis + data.extend_from_slice(&encode_avro_long(1)); // not null + let mut duration1 = Vec::new(); + duration1.extend_from_slice(&1u32.to_le_bytes()); + duration1.extend_from_slice(&2u32.to_le_bytes()); + duration1.extend_from_slice(&3u32.to_le_bytes()); + data.extend_from_slice(&duration1); + // Second value: null + data.extend_from_slice(&encode_avro_long(0)); // null + data.extend_from_slice(&encode_avro_long(1)); // not null + let mut duration2 = Vec::new(); + duration2.extend_from_slice(&4u32.to_le_bytes()); + duration2.extend_from_slice(&5u32.to_le_bytes()); + duration2.extend_from_slice(&6u32.to_le_bytes()); + data.extend_from_slice(&duration2); + let mut cursor = AvroCursor::new(&data); + decoder.decode(&mut cursor).unwrap(); + decoder.decode(&mut cursor).unwrap(); + decoder.decode(&mut cursor).unwrap(); + let array = decoder.flush(None).unwrap(); + let interval_array = array + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(interval_array.len(), 3); + assert!(interval_array.is_valid(0)); + assert!(interval_array.is_null(1)); + assert!(interval_array.is_valid(2)); + let expected = IntervalMonthDayNanoArray::from(vec![ + Some(IntervalMonthDayNano { + months: 1, + days: 2, + nanoseconds: 3_000_000, + }), + None, + Some(IntervalMonthDayNano { + months: 4, + days: 5, + nanoseconds: 6_000_000, + }), + ]); + assert_eq!(interval_array, &expected); + } + + #[test] + fn test_duration_decoding_empty() { + let duration_codec = Codec::Interval; + let avro_type = AvroDataType::new(duration_codec, Default::default(), None); + let mut decoder = Decoder::try_new(&avro_type).unwrap(); + let array = decoder.flush(None).unwrap(); + assert_eq!(array.len(), 0); + } } diff --git a/arrow-avro/test/data/duration_uuid.avro b/arrow-avro/test/data/duration_uuid.avro new file mode 100644 index 000000000000..09dd67b7807a Binary files /dev/null and b/arrow-avro/test/data/duration_uuid.avro differ