Skip to content

Commit 02e06c5

Browse files
jecsand838mbrobbel
andauthored
Add arrow-avro support for Duration type and minor fixes for UUID decoding (#7889)
# Which issue does this PR close? - Part of #4886 - Related to #6965 # Rationale for this change The `arrow-avro` crate currently lacks support for the Avro `duration` type, which is a standard and commonly used type in Avro schemas. This omission prevents users from reading Avro files containing duration types, limiting the crate's utility. This change introduces support for decoding Avro duration types by mapping them to the Arrow `Interval` type. This is a logical and efficient representation. Implementing this feature brings the `arrow-avro` crate closer to full Avro specification compliance and makes it more robust for real-world use cases. # What changes are included in this PR? This PR contains: * arrow-avro decoder support for Duration types. * Minor fixes UUID decoding. UUID types now map to `utf8` type to better align with the [Avro specification](https://avro.apache.org/docs/1.11.1/specification/#uuid) * New integration test using a temporary `duration_uuid.avro` file crate using this python script: https://gist.github.com/jecsand838/cbdaaf581af78f357778bf87d2f3cf15 # Are these changes tested? Yes, this PR includes for integration and unit tests covering these modifications. # Are there any user-facing changes? N/A # Follow-Up PRs 1. PR to update `test_duration_uuid` once apache/arrow-testing#108 is merged in. --------- Co-authored-by: Matthijs Brobbel <[email protected]>
1 parent 52fd59c commit 02e06c5

File tree

5 files changed

+222
-27
lines changed

5 files changed

+222
-27
lines changed

arrow-avro/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ all-features = true
3939
default = ["deflate", "snappy", "zstd", "bzip2", "xz"]
4040
deflate = ["flate2"]
4141
snappy = ["snap", "crc"]
42+
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
4243

4344
[dependencies]
4445
arrow-schema = { workspace = true }
@@ -52,6 +53,7 @@ zstd = { version = "0.13", default-features = false, optional = true }
5253
bzip2 = { version = "0.4.4", default-features = false, optional = true }
5354
xz = { version = "0.1", default-features = false, optional = true }
5455
crc = { version = "3.0", optional = true }
56+
uuid = "1.17"
5557

5658
[dev-dependencies]
5759
rand = { version = "0.9.1", default-features = false, features = ["std", "std_rng", "thread_rng"] }

arrow-avro/src/codec.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ pub enum Nullability {
3737
NullSecond,
3838
}
3939

40+
#[cfg(feature = "canonical_extension_types")]
41+
fn with_extension_type(codec: &Codec, field: Field) -> Field {
42+
match codec {
43+
Codec::Uuid => field.with_extension_type(arrow_schema::extension::Uuid),
44+
_ => field,
45+
}
46+
}
47+
4048
/// An Avro datatype mapped to the arrow data model
4149
#[derive(Debug, Clone)]
4250
pub struct AvroDataType {
@@ -61,8 +69,13 @@ impl AvroDataType {
6169

6270
/// Returns an arrow [`Field`] with the given name
6371
pub fn field_with_name(&self, name: &str) -> Field {
64-
let d = self.codec.data_type();
65-
Field::new(name, d, self.nullability.is_some()).with_metadata(self.metadata.clone())
72+
let nullable = self.nullability.is_some();
73+
let data_type = self.codec.data_type();
74+
let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone());
75+
#[cfg(feature = "canonical_extension_types")]
76+
return with_extension_type(&self.codec, field);
77+
#[cfg(not(feature = "canonical_extension_types"))]
78+
field
6679
}
6780

6881
/// Returns a reference to the codec used by this data type
@@ -200,7 +213,7 @@ pub enum Codec {
200213
/// - `scale` (`Option<usize>`): Number of fractional digits.
201214
/// - `fixed_size` (`Option<usize>`): Size in bytes if backed by a `fixed` type, otherwise `None`.
202215
Decimal(usize, Option<usize>, Option<usize>),
203-
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
216+
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16.
204217
Uuid,
205218
/// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
206219
///
@@ -479,6 +492,18 @@ fn make_data_type<'a>(
479492
codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
480493
}
481494
}
495+
Some("duration") => {
496+
if size != 12 {
497+
return Err(ArrowError::ParseError(format!(
498+
"Invalid fixed size for Duration: {size}, must be 12"
499+
)));
500+
};
501+
AvroDataType {
502+
nullability: None,
503+
metadata: md,
504+
codec: Codec::Interval,
505+
}
506+
}
482507
_ => AvroDataType {
483508
nullability: None,
484509
metadata: md,
@@ -543,7 +568,6 @@ fn make_data_type<'a>(
543568
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
544569
*c = Codec::TimestampMicros(false)
545570
}
546-
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
547571
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
548572
(Some(logical), _) => {
549573
// Insert unrecognized logical type into metadata map

arrow-avro/src/reader/mod.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,9 @@ mod test {
397397
use crate::reader::vlq::VLQDecoder;
398398
use crate::reader::{read_header, Decoder, ReaderBuilder};
399399
use crate::test_util::arrow_test_data;
400-
use arrow_array::types::Int32Type;
400+
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
401401
use arrow_array::*;
402-
use arrow_schema::{ArrowError, DataType, Field, Schema};
402+
use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema};
403403
use bytes::{Buf, BufMut, Bytes};
404404
use futures::executor::block_on;
405405
use futures::{stream, Stream, StreamExt, TryStreamExt};
@@ -796,4 +796,65 @@ mod test {
796796
assert_eq!(actual2, expected);
797797
}
798798
}
799+
800+
#[test]
801+
fn test_duration_uuid() {
802+
let batch = read_file("test/data/duration_uuid.avro", 4, false);
803+
let schema = batch.schema();
804+
let fields = schema.fields();
805+
assert_eq!(fields.len(), 2);
806+
assert_eq!(fields[0].name(), "duration_field");
807+
assert_eq!(
808+
fields[0].data_type(),
809+
&DataType::Interval(IntervalUnit::MonthDayNano)
810+
);
811+
assert_eq!(fields[1].name(), "uuid_field");
812+
assert_eq!(fields[1].data_type(), &DataType::FixedSizeBinary(16));
813+
assert_eq!(batch.num_rows(), 4);
814+
assert_eq!(batch.num_columns(), 2);
815+
let duration_array = batch
816+
.column(0)
817+
.as_any()
818+
.downcast_ref::<IntervalMonthDayNanoArray>()
819+
.unwrap();
820+
let expected_duration_array: IntervalMonthDayNanoArray = [
821+
Some(IntervalMonthDayNanoType::make_value(1, 15, 500_000_000)),
822+
Some(IntervalMonthDayNanoType::make_value(0, 5, 2_500_000_000)),
823+
Some(IntervalMonthDayNanoType::make_value(2, 0, 0)),
824+
Some(IntervalMonthDayNanoType::make_value(12, 31, 999_000_000)),
825+
]
826+
.iter()
827+
.copied()
828+
.collect();
829+
assert_eq!(&expected_duration_array, duration_array);
830+
let uuid_array = batch
831+
.column(1)
832+
.as_any()
833+
.downcast_ref::<FixedSizeBinaryArray>()
834+
.unwrap();
835+
let expected_uuid_array = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
836+
[
837+
Some([
838+
0xfe, 0x7b, 0xc3, 0x0b, 0x4c, 0xe8, 0x4c, 0x5e, 0xb6, 0x7c, 0x22, 0x34, 0xa2,
839+
0xd3, 0x8e, 0x66,
840+
]),
841+
Some([
842+
0xb3, 0x3f, 0x2a, 0xd7, 0x97, 0xb4, 0x4d, 0xe1, 0x8b, 0xfe, 0x94, 0x94, 0x1d,
843+
0x60, 0x15, 0x6e,
844+
]),
845+
Some([
846+
0x5f, 0x74, 0x92, 0x64, 0x07, 0x4b, 0x40, 0x05, 0x84, 0xbf, 0x11, 0x5e, 0xa8,
847+
0x4e, 0xd2, 0x0a,
848+
]),
849+
Some([
850+
0x08, 0x26, 0xcc, 0x06, 0xd2, 0xe3, 0x45, 0x99, 0xb4, 0xad, 0xaf, 0x5f, 0xa6,
851+
0x90, 0x5c, 0xdb,
852+
]),
853+
]
854+
.into_iter(),
855+
16,
856+
)
857+
.unwrap();
858+
assert_eq!(&expected_uuid_array, uuid_array);
859+
}
799860
}

0 commit comments

Comments
 (0)