Skip to content

Commit 749d435

Browse files
committed
Add support for Duration type and minor fixes for UUID in arrow-avro:
- Fixed `Uuid` support, now represented as `Utf8` in Arrow and added testing logic. - Added `Duration` support, mapped to Arrow's `IntervalMonthDayNano`, with schema handling, decoding, and integration tests. - Updated `Cargo.toml` to include the `uuid` crate as a dev dependency for UUID checking. - Added integration tests with the new `duration_uuid.avro` test file.
1 parent 9246872 commit 749d435

File tree

5 files changed

+162
-23
lines changed

5 files changed

+162
-23
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ rand = { version = "0.9", default-features = false, features = ["std", "std_rng"
5656
criterion = { version = "0.6.0", default-features = false }
5757
tempfile = "3.3"
5858
arrow = { workspace = true }
59+
uuid = { version = "1.17.0", features = ["v4"] }
5960

6061
[[bench]]
6162
name = "avro_reader"

arrow-avro/src/codec.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl Codec {
256256
Decimal128(p, s)
257257
}
258258
}
259-
Self::Uuid => DataType::FixedSizeBinary(16),
259+
Self::Uuid => DataType::Utf8,
260260
Self::Enum(_) => {
261261
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
262262
}
@@ -480,6 +480,18 @@ fn make_data_type<'a>(
480480
codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
481481
}
482482
}
483+
Some("duration") => {
484+
if size != 12 {
485+
return Err(ArrowError::ParseError(format!(
486+
"Invalid fixed size for Duration: {size}, must be 12"
487+
)));
488+
};
489+
AvroDataType {
490+
nullability: None,
491+
metadata: md,
492+
codec: Codec::Interval,
493+
}
494+
}
483495
_ => AvroDataType {
484496
nullability: None,
485497
metadata: md,
@@ -544,7 +556,6 @@ fn make_data_type<'a>(
544556
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
545557
*c = Codec::TimestampMicros(false)
546558
}
547-
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
548559
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
549560
(Some(logical), _) => {
550561
// Insert unrecognized logical type into metadata map

arrow-avro/src/reader/mod.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,14 @@ mod test {
121121
use crate::reader::record::RecordDecoder;
122122
use crate::reader::{read_blocks, read_header};
123123
use crate::test_util::arrow_test_data;
124-
use arrow_array::types::Int32Type;
124+
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
125125
use arrow_array::*;
126-
use arrow_schema::{DataType, Field, Schema};
126+
use arrow_schema::{DataType, Field, IntervalUnit, Schema};
127127
use std::collections::HashMap;
128128
use std::fs::File;
129129
use std::io::BufReader;
130130
use std::sync::Arc;
131+
use uuid::Uuid;
131132

132133
fn read_file(file: &str, batch_size: usize) -> RecordBatch {
133134
read_file_with_options(file, batch_size, &crate::ReadOptions::default())
@@ -440,4 +441,45 @@ mod test {
440441
assert_eq!(actual2, expected);
441442
}
442443
}
444+
445+
#[test]
446+
fn test_duration_uuid() {
447+
let batch = read_file("test/data/duration_uuid.avro", 4);
448+
let schema = batch.schema();
449+
let fields = schema.fields();
450+
assert_eq!(fields.len(), 2);
451+
assert_eq!(fields[0].name(), "duration_field");
452+
assert_eq!(
453+
fields[0].data_type(),
454+
&DataType::Interval(IntervalUnit::MonthDayNano)
455+
);
456+
assert_eq!(fields[1].name(), "uuid_field");
457+
assert_eq!(fields[1].data_type(), &DataType::Utf8);
458+
assert_eq!(batch.num_rows(), 4);
459+
assert_eq!(batch.num_columns(), 2);
460+
let duration_array = batch
461+
.column(0)
462+
.as_any()
463+
.downcast_ref::<IntervalMonthDayNanoArray>()
464+
.unwrap();
465+
let expected_duration_array: IntervalMonthDayNanoArray = [
466+
Some(IntervalMonthDayNanoType::make_value(1, 15, 500_000_000)),
467+
Some(IntervalMonthDayNanoType::make_value(0, 5, 2_500_000_000)),
468+
Some(IntervalMonthDayNanoType::make_value(2, 0, 0)),
469+
Some(IntervalMonthDayNanoType::make_value(12, 31, 999_000_000)),
470+
]
471+
.iter()
472+
.copied()
473+
.collect();
474+
assert_eq!(&expected_duration_array, duration_array);
475+
let uuid_array = batch
476+
.column(1)
477+
.as_any()
478+
.downcast_ref::<StringArray>()
479+
.unwrap();
480+
for i in 0..uuid_array.len() {
481+
assert!(uuid_array.is_valid(i));
482+
assert!(Uuid::parse_str(uuid_array.value(i)).is_ok());
483+
}
484+
}
443485
}

arrow-avro/src/reader/record.rs

Lines changed: 104 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ use crate::reader::cursor::AvroCursor;
2121
use crate::reader::header::Header;
2222
use crate::reader::ReadOptions;
2323
use crate::schema::*;
24-
use arrow_array::builder::{Decimal128Builder, Decimal256Builder};
24+
use arrow_array::builder::{
25+
ArrayBuilder, Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder,
26+
PrimitiveBuilder,
27+
};
2528
use arrow_array::types::*;
2629
use arrow_array::*;
2730
use arrow_buffer::*;
2831
use arrow_schema::{
29-
ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as ArrowSchema, SchemaRef,
30-
DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
32+
ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit,
33+
Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
3134
};
3235
use std::cmp::Ordering;
3336
use std::collections::HashMap;
@@ -128,15 +131,15 @@ enum Decoder {
128131
),
129132
Fixed(i32, Vec<u8>),
130133
Enum(Vec<i32>, Arc<[String]>),
134+
Duration(IntervalMonthDayNanoBuilder),
135+
Uuid(OffsetBufferBuilder<i32>, Vec<u8>),
131136
Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
132137
Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
133138
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
134139
}
135140

136141
impl Decoder {
137142
fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
138-
let nyi = |s: &str| Err(ArrowError::NotYetImplemented(s.to_string()));
139-
140143
let decoder = match data_type.codec() {
141144
Codec::Null => Self::Null(0),
142145
Codec::Boolean => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
@@ -205,7 +208,7 @@ impl Decoder {
205208
}
206209
}
207210
}
208-
Codec::Interval => return nyi("decoding interval"),
211+
Codec::Interval => Self::Duration(IntervalMonthDayNanoBuilder::new()),
209212
Codec::List(item) => {
210213
let decoder = Self::try_new(item)?;
211214
Self::Array(
@@ -246,7 +249,10 @@ impl Decoder {
246249
Box::new(val_dec),
247250
)
248251
}
249-
Codec::Uuid => Self::Fixed(16, Vec::with_capacity(DEFAULT_CAPACITY)),
252+
Codec::Uuid => Self::Uuid(
253+
OffsetBufferBuilder::new(DEFAULT_CAPACITY),
254+
Vec::with_capacity(DEFAULT_CAPACITY),
255+
),
250256
};
251257
Ok(match data_type.nullability() {
252258
Some(nullability) => Self::Nullable(
@@ -270,7 +276,10 @@ impl Decoder {
270276
| Self::TimestampMicros(_, v) => v.push(0),
271277
Self::Float32(v) => v.push(0.),
272278
Self::Float64(v) => v.push(0.),
273-
Self::Binary(offsets, _) | Self::String(offsets, _) | Self::StringView(offsets, _) => {
279+
Self::Binary(offsets, _)
280+
| Self::String(offsets, _)
281+
| Self::StringView(offsets, _)
282+
| Self::Uuid(offsets, _) => {
274283
offsets.push_length(0);
275284
}
276285
Self::Array(_, offsets, e) => {
@@ -287,6 +296,7 @@ impl Decoder {
287296
Self::Decimal128(_, _, _, builder) => builder.append_value(0),
288297
Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
289298
Self::Enum(indices, _) => indices.push(0),
299+
Self::Duration(builder) => builder.append_null(),
290300
Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
291301
}
292302
}
@@ -307,7 +317,8 @@ impl Decoder {
307317
Self::Float64(values) => values.push(buf.get_double()?),
308318
Self::Binary(offsets, values)
309319
| Self::String(offsets, values)
310-
| Self::StringView(offsets, values) => {
320+
| Self::StringView(offsets, values)
321+
| Self::Uuid(offsets, values) => {
311322
let data = buf.get_bytes()?;
312323
offsets.push_length(data.len());
313324
values.extend_from_slice(data);
@@ -357,6 +368,14 @@ impl Decoder {
357368
Self::Enum(indices, _) => {
358369
indices.push(buf.get_int()?);
359370
}
371+
Self::Duration(builder) => {
372+
let b = buf.get_fixed(12)?;
373+
let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
374+
let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
375+
let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
376+
let nanos = (millis as i64) * 1_000_000;
377+
builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
378+
}
360379
Self::Nullable(nullability, nulls, e) => {
361380
let is_valid = buf.get_bool()? == matches!(nullability, Nullability::NullFirst);
362381
nulls.append(is_valid);
@@ -399,7 +418,7 @@ impl Decoder {
399418
let values = flush_values(values).into();
400419
Arc::new(BinaryArray::new(offsets, values, nulls))
401420
}
402-
Self::String(offsets, values) => {
421+
Self::String(offsets, values) | Self::Uuid(offsets, values) => {
403422
let offsets = flush_offsets(offsets);
404423
let values = flush_values(values).into();
405424
Arc::new(StringArray::new(offsets, values, nulls))
@@ -417,7 +436,6 @@ impl Decoder {
417436
}
418437
})
419438
.collect();
420-
421439
Arc::new(StringViewArray::from(values))
422440
}
423441
Self::Array(field, offsets, values) => {
@@ -472,17 +490,15 @@ impl Decoder {
472490
Arc::new(arr)
473491
}
474492
Self::Decimal128(precision, scale, _, builder) => {
475-
let mut b = std::mem::take(builder);
476-
let (_, vals, _) = b.finish().into_parts();
493+
let (_, vals, _) = builder.finish().into_parts();
477494
let scl = scale.unwrap_or(0);
478495
let dec = Decimal128Array::new(vals, nulls)
479496
.with_precision_and_scale(*precision as u8, scl as i8)
480497
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
481498
Arc::new(dec)
482499
}
483500
Self::Decimal256(precision, scale, _, builder) => {
484-
let mut b = std::mem::take(builder);
485-
let (_, vals, _) = b.finish().into_parts();
501+
let (_, vals, _) = builder.finish().into_parts();
486502
let scl = scale.unwrap_or(0);
487503
let dec = Decimal256Array::new(vals, nulls)
488504
.with_precision_and_scale(*precision as u8, scl as i8)
@@ -496,6 +512,12 @@ impl Decoder {
496512
));
497513
Arc::new(DictionaryArray::try_new(keys, values)?)
498514
}
515+
Self::Duration(builder) => {
516+
let (_, vals, _) = builder.finish().into_parts();
517+
let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
518+
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
519+
Arc::new(vals)
520+
}
499521
})
500522
}
501523
}
@@ -744,16 +766,16 @@ mod tests {
744766
fn test_uuid_decoding() {
745767
let avro_type = avro_from_codec(Codec::Uuid);
746768
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
747-
748-
let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
769+
let uuid_bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
770+
let data1 = encode_avro_bytes(&uuid_bytes);
749771
let mut cursor1 = AvroCursor::new(&data1);
750772
decoder
751773
.decode(&mut cursor1)
752774
.expect("Failed to decode data1");
753775
assert_eq!(
754776
cursor1.position(),
755-
16,
756-
"Cursor should advance by fixed size"
777+
17,
778+
"Cursor should advance by varint size + data size"
757779
);
758780
}
759781

@@ -1035,4 +1057,67 @@ mod tests {
10351057
assert_eq!(values.value(0), "X");
10361058
assert_eq!(values.value(1), "Y");
10371059
}
1060+
1061+
#[test]
1062+
fn test_duration_decoding_with_nulls() {
1063+
let duration_codec = Codec::Interval;
1064+
let avro_type = AvroDataType::new(
1065+
duration_codec,
1066+
Default::default(),
1067+
Some(Nullability::NullFirst),
1068+
);
1069+
let mut decoder = Decoder::try_new(&avro_type).unwrap();
1070+
let mut data = Vec::new();
1071+
// First value: 1 month, 2 days, 3 millis
1072+
data.extend_from_slice(&encode_avro_long(1)); // not null
1073+
let mut duration1 = Vec::new();
1074+
duration1.extend_from_slice(&1u32.to_le_bytes());
1075+
duration1.extend_from_slice(&2u32.to_le_bytes());
1076+
duration1.extend_from_slice(&3u32.to_le_bytes());
1077+
data.extend_from_slice(&duration1);
1078+
// Second value: null
1079+
data.extend_from_slice(&encode_avro_long(0)); // null
1080+
data.extend_from_slice(&encode_avro_long(1)); // not null
1081+
let mut duration2 = Vec::new();
1082+
duration2.extend_from_slice(&4u32.to_le_bytes());
1083+
duration2.extend_from_slice(&5u32.to_le_bytes());
1084+
duration2.extend_from_slice(&6u32.to_le_bytes());
1085+
data.extend_from_slice(&duration2);
1086+
let mut cursor = AvroCursor::new(&data);
1087+
decoder.decode(&mut cursor).unwrap();
1088+
decoder.decode(&mut cursor).unwrap();
1089+
decoder.decode(&mut cursor).unwrap();
1090+
let array = decoder.flush(None).unwrap();
1091+
let interval_array = array
1092+
.as_any()
1093+
.downcast_ref::<IntervalMonthDayNanoArray>()
1094+
.unwrap();
1095+
assert_eq!(interval_array.len(), 3);
1096+
assert!(interval_array.is_valid(0));
1097+
assert!(interval_array.is_null(1));
1098+
assert!(interval_array.is_valid(2));
1099+
let expected = IntervalMonthDayNanoArray::from(vec![
1100+
Some(IntervalMonthDayNano {
1101+
months: 1,
1102+
days: 2,
1103+
nanoseconds: 3_000_000,
1104+
}),
1105+
None,
1106+
Some(IntervalMonthDayNano {
1107+
months: 4,
1108+
days: 5,
1109+
nanoseconds: 6_000_000,
1110+
}),
1111+
]);
1112+
assert_eq!(interval_array, &expected);
1113+
}
1114+
1115+
#[test]
1116+
fn test_duration_decoding_empty() {
1117+
let duration_codec = Codec::Interval;
1118+
let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
1119+
let mut decoder = Decoder::try_new(&avro_type).unwrap();
1120+
let array = decoder.flush(None).unwrap();
1121+
assert_eq!(array.len(), 0);
1122+
}
10381123
}
517 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)