From befb73cd7741cb16054323bcacfb34992c61cc2b Mon Sep 17 00:00:00 2001 From: "veronica.manchola" Date: Thu, 17 Jul 2025 12:41:00 -0400 Subject: [PATCH 01/10] implement nullable and nonnullable impala decoder --- arrow-avro/src/reader/mod.rs | 378 +++++++++++++++++++++++++++++++- arrow-avro/src/reader/record.rs | 97 +++++--- 2 files changed, 440 insertions(+), 35 deletions(-) diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 5059e41ff0a3..d121ecc4f891 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -395,7 +395,7 @@ mod test { use crate::compression::CompressionCodec; use crate::reader::record::RecordDecoder; use crate::reader::vlq::VLQDecoder; - use crate::reader::{read_header, Decoder, ReaderBuilder}; + use crate::reader::{read_header, Decoder, Reader, ReaderBuilder}; use crate::test_util::arrow_test_data; use arrow_array::types::{Int32Type, IntervalMonthDayNanoType}; use arrow_array::*; @@ -409,6 +409,7 @@ mod test { use std::io::{BufReader, Cursor, Read}; use std::sync::Arc; use std::task::{ready, Poll}; + use arrow_array::builder::{Float64Builder, Int32Builder, ListBuilder, MapBuilder, StringBuilder, StructBuilder}; fn read_file(path: &str, batch_size: usize, utf8_view: bool) -> RecordBatch { let file = File::open(path).unwrap(); @@ -422,7 +423,16 @@ mod test { arrow::compute::concat_batches(&schema, &batches).unwrap() } - fn decode_stream + Unpin>( + fn read_file_strict(path: &str, batch_size: usize, utf8_view: bool) -> Result>, ArrowError> { + let file = File::open(path).unwrap(); + ReaderBuilder::new() + .with_batch_size(batch_size) + .with_utf8_view(utf8_view) + .with_strict_mode(true) + .build(BufReader::new(file)) + } + + fn decode_stream + Unpin>( mut decoder: Decoder, mut input: S, ) -> impl Stream> { @@ -857,4 +867,368 @@ mod test { .unwrap(); assert_eq!(&expected_uuid_array, uuid_array); } + + #[test] + fn test_nonnullable_impala() { + let file = arrow_test_data("avro/nonnullable.impala.avro"); + let id = Int64Array::from(vec![Some(8)]); + let mut int_array_builder = ListBuilder::new(Int32Builder::new()); + { + let vb = int_array_builder.values(); + vb.append_value(-1); + } + int_array_builder.append(true); // finalize one sub-list + let int_array = int_array_builder.finish(); + let mut iaa_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new())); + { + let inner_list_builder = iaa_builder.values(); + { + let vb = inner_list_builder.values(); + vb.append_value(-1); + vb.append_value(-2); + } + inner_list_builder.append(true); + inner_list_builder.append(true); + } + iaa_builder.append(true); + let int_array_array = iaa_builder.finish(); + use arrow_array::builder::MapFieldNames; + let field_names = MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut int_map_builder = + MapBuilder::new(Some(field_names), StringBuilder::new(), Int32Builder::new()); + { + let (keys, vals) = int_map_builder.entries(); + keys.append_value("k1"); + vals.append_value(-1); + } + int_map_builder.append(true).unwrap(); // finalize map for row 0 + let int_map = int_map_builder.finish(); + let field_names2 = MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut ima_builder = ListBuilder::new(MapBuilder::new( + Some(field_names2), + StringBuilder::new(), + Int32Builder::new(), + )); + { + let map_builder = ima_builder.values(); + map_builder.append(true).unwrap(); + { + let (keys, vals) = map_builder.entries(); + keys.append_value("k1"); + vals.append_value(1); + } + map_builder.append(true).unwrap(); + map_builder.append(true).unwrap(); + map_builder.append(true).unwrap(); + } + ima_builder.append(true); + let int_map_array_ = ima_builder.finish(); + let mut nested_sb = StructBuilder::new( + vec![ + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Field::new( + "B", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + )), + Arc::new(Field::new( + "c", + DataType::Struct( + vec![Field::new( + "D", + DataType::List(Arc::new(Field::new( + "item", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![ + Field::new("e", DataType::Int32, true), + Field::new("f", DataType::Utf8, true), + ] + .into(), + ), + true, + ))), + true, + ))), + true, + )] + .into(), + ), + true, + )), + Arc::new(Field::new( + "G", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Struct( + vec![Field::new( + "h", + DataType::Struct( + vec![Field::new( + "i", + DataType::List(Arc::new(Field::new( + "item", + DataType::Float64, + true, + ))), + true, + )] + .into(), + ), + true, + )] + .into(), + ), + true, + ), + ] + .into(), + ), + false, + )), + false, + ), + true, + )), + ], + vec![ + Box::new(Int32Builder::new()), + Box::new(ListBuilder::new(Int32Builder::new())), + { + let d_field = Field::new( + "D", + DataType::List(Arc::new(Field::new( + "item", + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct( + vec![ + Field::new("e", DataType::Int32, true), + Field::new("f", DataType::Utf8, true), + ] + .into(), + ), + true, + ))), + true, + ))), + true, + ); + Box::new(StructBuilder::new( + vec![Arc::new(d_field)], + vec![Box::new({ + let ef_struct_builder = StructBuilder::new( + vec![ + Arc::new(Field::new("e", DataType::Int32, true)), + Arc::new(Field::new("f", DataType::Utf8, true)), + ], + vec![ + Box::new(Int32Builder::new()), + Box::new(StringBuilder::new()), + ], + ); + let list_of_ef = ListBuilder::new(ef_struct_builder); + ListBuilder::new(list_of_ef) + })], + )) + }, + { + let map_field_names = MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let i_list_builder = ListBuilder::new(Float64Builder::new()); + let h_struct = StructBuilder::new( + vec![Arc::new(Field::new( + "i", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + true, + ))], + vec![Box::new(i_list_builder)], + ); + let g_value_builder = StructBuilder::new( + vec![Arc::new(Field::new( + "h", + DataType::Struct( + vec![Field::new( + "i", + DataType::List(Arc::new(Field::new( + "item", + DataType::Float64, + true, + ))), + true, + )] + .into(), + ), + true, + ))], + vec![Box::new(h_struct)], + ); + Box::new(MapBuilder::new( + Some(map_field_names), + StringBuilder::new(), + g_value_builder, + )) + }, + ], + ); + nested_sb.append(true); + { + let a_builder = nested_sb.field_builder::(0).unwrap(); + a_builder.append_value(-1); + } + { + let b_builder = nested_sb + .field_builder::>(1) + .unwrap(); + { + let vb = b_builder.values(); + vb.append_value(-1); + } + b_builder.append(true); + } + { + let c_struct_builder = nested_sb.field_builder::(2).unwrap(); + c_struct_builder.append(true); + let d_list_builder = c_struct_builder + .field_builder::>>(0) + .unwrap(); + { + let sub_list_builder = d_list_builder.values(); + { + let ef_struct = sub_list_builder.values(); + ef_struct.append(true); + { + let e_b = ef_struct.field_builder::(0).unwrap(); + e_b.append_value(-1); + let f_b = ef_struct.field_builder::(1).unwrap(); + f_b.append_value("nonnullable"); + } + sub_list_builder.append(true); + } + d_list_builder.append(true); + } + } + { + let g_map_builder = nested_sb + .field_builder::>(3) + .unwrap(); + g_map_builder.append(true).unwrap(); + } + let nested_struct = nested_sb.finish(); + let expected = RecordBatch::try_from_iter_with_nullable([ + ("ID", Arc::new(id) as Arc, true), + ("Int_Array", Arc::new(int_array), true), + ("int_array_array", Arc::new(int_array_array), true), + ("Int_Map", Arc::new(int_map), true), + ("int_map_array", Arc::new(int_map_array_), true), + ("nested_Struct", Arc::new(nested_struct), true), + ]) + .unwrap(); + let batch_large = read_file(&file, 8, false); + assert_eq!(batch_large, expected, "Mismatch for batch_size=8"); + let batch_small = read_file(&file, 3, false); + assert_eq!(batch_small, expected, "Mismatch for batch_size=3"); + } + + #[test] + fn test_nonnullable_impala_strict() { + let file = arrow_test_data("avro/nonnullable.impala.avro"); + let err = read_file_strict(&file, 8, false).unwrap_err(); + assert!(err.to_string().contains( + "Found Avro union of the form ['T','null'], which is disallowed in strict_mode" + )); + } + + #[test] + fn test_nullable_impala() { + let file = arrow_test_data("avro/nullable.impala.avro"); + let batch1 = read_file(&file, 3, false); + let batch2 = read_file(&file, 8, false); + assert_eq!(batch1, batch2); + let batch = batch1; + assert_eq!(batch.num_rows(), 7); + let id_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("id column should be an Int64Array"); + let expected_ids = [1, 2, 3, 4, 5, 6, 7]; + for (i, &expected_id) in expected_ids.iter().enumerate() { + assert_eq!( + id_array.value(i), + expected_id, + "Mismatch in id at row {}", + i + ); + } + let int_array = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("int_array column should be a ListArray"); + { + let offsets = int_array.value_offsets(); + let start = offsets[0] as usize; + let end = offsets[1] as usize; + let values = int_array + .values() + .as_any() + .downcast_ref::() + .expect("Values of int_array should be an Int32Array"); + let row0: Vec> = (start..end).map(|i| Some(values.value(i))).collect(); + assert_eq!( + row0, + vec![Some(1), Some(2), Some(3)], + "Mismatch in int_array row 0" + ); + } + let nested_struct = batch + .column(5) + .as_any() + .downcast_ref::() + .expect("nested_struct column should be a StructArray"); + let a_array = nested_struct + .column_by_name("A") + .expect("Field A should exist in nested_struct") + .as_any() + .downcast_ref::() + .expect("Field A should be an Int32Array"); + assert_eq!(a_array.value(0), 1, "Mismatch in nested_struct.A at row 0"); + assert!( + !a_array.is_valid(1), + "Expected null in nested_struct.A at row 1" + ); + assert!( + !a_array.is_valid(3), + "Expected null in nested_struct.A at row 3" + ); + assert_eq!(a_array.value(6), 7, "Mismatch in nested_struct.A at row 6"); + } + + #[test] + fn test_nullable_impala_strict() { + let file = arrow_test_data("avro/nullable.impala.avro"); + let err = read_file_strict(&file, 8, false).unwrap_err(); + assert!(err.to_string().contains( + "Found Avro union of the form ['T','null'], which is disallowed in strict_mode" + )); + } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 2ef382a22671..01990072484b 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -111,7 +111,7 @@ impl RecordDecoder { use_utf8view: bool, strict_mode: bool, ) -> Result { - match Decoder::try_new(data_type)? { + match Decoder::try_new(data_type, strict_mode)? { Decoder::Record(fields, encodings) => Ok(Self { schema: Arc::new(ArrowSchema::new(fields)), fields: encodings, @@ -189,7 +189,7 @@ enum Decoder { } impl Decoder { - fn try_new(data_type: &AvroDataType) -> Result { + fn try_new(data_type: &AvroDataType, strict_mode: bool) -> Result { let decoder = match data_type.codec() { Codec::Null => Self::Null(0), Codec::Boolean => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)), @@ -260,7 +260,7 @@ impl Decoder { } Codec::Interval => Self::Duration(IntervalMonthDayNanoBuilder::new()), Codec::List(item) => { - let decoder = Self::try_new(item)?; + let decoder = Self::try_new(item, strict_mode)?; Self::Array( Arc::new(item.field_with_name("item")), OffsetBufferBuilder::new(DEFAULT_CAPACITY), @@ -274,7 +274,7 @@ impl Decoder { let mut arrow_fields = Vec::with_capacity(fields.len()); let mut encodings = Vec::with_capacity(fields.len()); for avro_field in fields.iter() { - let encoding = Self::try_new(avro_field.data_type())?; + let encoding = Self::try_new(avro_field.data_type(), strict_mode)?; arrow_fields.push(avro_field.field()); encodings.push(encoding); } @@ -290,7 +290,7 @@ impl Decoder { ])), false, )); - let val_dec = Self::try_new(child)?; + let val_dec = Self::try_new(child, strict_mode)?; Self::Map( map_field, OffsetBufferBuilder::new(DEFAULT_CAPACITY), @@ -301,9 +301,23 @@ impl Decoder { } Codec::Uuid => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)), }; - Ok(match data_type.nullability() { - Some(nullability) => Self::Nullable( - nullability, + let union_order = match data_type.nullability() { + None => None, + Some(Nullability::NullFirst) => Some(Nullability::NullFirst), + Some(Nullability::NullSecond) => { + if strict_mode { + return Err(ArrowError::ParseError( + "Found Avro union of the form ['T','null'], which is disallowed in strict_mode" + .to_string(), + )); + } + Some(Nullability::NullSecond) + } + }; + + Ok(match union_order { + Some(order) => Decoder::Nullable( + order, NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(decoder), ), @@ -331,7 +345,6 @@ impl Decoder { } Self::Array(_, offsets, e) => { offsets.push_length(0); - e.append_null(); } Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()), Self::Map(_, _koff, moff, _, _) => { @@ -344,7 +357,10 @@ impl Decoder { Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO), Self::Enum(indices, _) => indices.push(0), Self::Duration(builder) => builder.append_null(), - Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"), + Self::Nullable(order, null_buffer, inner) => { + null_buffer.append(false); + inner.append_null(); + }, } } @@ -431,12 +447,27 @@ impl Decoder { let nanos = (millis as i64) * 1_000_000; builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos)); } - Self::Nullable(nullability, nulls, e) => { - let is_valid = buf.get_bool()? == matches!(nullability, Nullability::NullFirst); - nulls.append(is_valid); - match is_valid { - true => e.decode(buf)?, - false => e.append_null(), + Self::Nullable(order, nb, child) => { + let branch = buf.get_int()?; + match order { + Nullability::NullFirst => { + if branch == 0 { + nb.append(false); + child.append_null(); + } else { + nb.append(true); + child.decode(buf)?; + } + } + Nullability::NullSecond => { + if branch == 0 { + nb.append(true); + child.decode(buf)?; + } else { + nb.append(false); + child.append_null(); + } + } } } } @@ -717,7 +748,7 @@ mod tests { fn test_map_decoding_one_entry() { let value_type = avro_from_codec(Codec::Utf8); let map_type = avro_from_codec(Codec::Map(Arc::new(value_type))); - let mut decoder = Decoder::try_new(&map_type).unwrap(); + let mut decoder = Decoder::try_new(&map_type, false).unwrap(); // Encode a single map with one entry: {"hello": "world"} let mut data = Vec::new(); data.extend_from_slice(&encode_avro_long(1)); @@ -753,7 +784,7 @@ mod tests { fn test_map_decoding_empty() { let value_type = avro_from_codec(Codec::Utf8); let map_type = avro_from_codec(Codec::Map(Arc::new(value_type))); - let mut decoder = Decoder::try_new(&map_type).unwrap(); + let mut decoder = Decoder::try_new(&map_type, false).unwrap(); let data = encode_avro_long(0); decoder.decode(&mut AvroCursor::new(&data)).unwrap(); let array = decoder.flush(None).unwrap(); @@ -765,7 +796,7 @@ mod tests { #[test] fn test_fixed_decoding() { let avro_type = avro_from_codec(Codec::Fixed(3)); - let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + let mut decoder = Decoder::try_new(&avro_type, false).expect("Failed to create decoder"); let data1 = [1u8, 2, 3]; let mut cursor1 = AvroCursor::new(&data1); @@ -805,7 +836,7 @@ mod tests { #[test] fn test_fixed_decoding_empty() { let avro_type = avro_from_codec(Codec::Fixed(5)); - let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + let mut decoder = Decoder::try_new(&avro_type, false).expect("Failed to create decoder"); let array = decoder .flush(None) @@ -827,7 +858,7 @@ mod tests { #[test] fn test_uuid_decoding() { let avro_type = avro_from_codec(Codec::Uuid); - let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); + let mut decoder = Decoder::try_new(&avro_type, false).expect("Failed to create decoder"); let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6"; let data = encode_avro_bytes(uuid_str.as_bytes()); let mut cursor = AvroCursor::new(&data); @@ -855,7 +886,7 @@ mod tests { fn test_array_decoding() { let item_dt = avro_from_codec(Codec::Int32); let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt))); - let mut decoder = Decoder::try_new(&list_dt).unwrap(); + let mut decoder = Decoder::try_new(&list_dt, false).unwrap(); let mut row1 = Vec::new(); row1.extend_from_slice(&encode_avro_long(2)); row1.extend_from_slice(&encode_avro_int(10)); @@ -882,7 +913,7 @@ mod tests { fn test_array_decoding_with_negative_block_count() { let item_dt = avro_from_codec(Codec::Int32); let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt))); - let mut decoder = Decoder::try_new(&list_dt).unwrap(); + let mut decoder = Decoder::try_new(&list_dt, false).unwrap(); let mut data = encode_avro_long(-3); data.extend_from_slice(&encode_avro_long(12)); data.extend_from_slice(&encode_avro_int(1)); @@ -906,7 +937,7 @@ mod tests { fn test_nested_array_decoding() { let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32)))); let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone()))); - let mut decoder = Decoder::try_new(&nested_ty).unwrap(); + let mut decoder = Decoder::try_new(&nested_ty, false).unwrap(); let mut buf = Vec::new(); buf.extend(encode_avro_long(1)); buf.extend(encode_avro_long(2)); @@ -935,7 +966,7 @@ mod tests { fn test_array_decoding_empty_array() { let value_type = avro_from_codec(Codec::Utf8); let map_type = avro_from_codec(Codec::List(Arc::new(value_type))); - let mut decoder = Decoder::try_new(&map_type).unwrap(); + let mut decoder = Decoder::try_new(&map_type, false).unwrap(); let data = encode_avro_long(0); decoder.decode(&mut AvroCursor::new(&data)).unwrap(); let array = decoder.flush(None).unwrap(); @@ -947,7 +978,7 @@ mod tests { #[test] fn test_decimal_decoding_fixed256() { let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32))); - let mut decoder = Decoder::try_new(&dt).unwrap(); + let mut decoder = Decoder::try_new(&dt, false).unwrap(); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -974,7 +1005,7 @@ mod tests { #[test] fn test_decimal_decoding_fixed128() { let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16))); - let mut decoder = Decoder::try_new(&dt).unwrap(); + let mut decoder = Decoder::try_new(&dt, false).unwrap(); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x39, @@ -999,7 +1030,7 @@ mod tests { #[test] fn test_decimal_decoding_bytes_with_nulls() { let dt = avro_from_codec(Codec::Decimal(4, Some(1), None)); - let inner = Decoder::try_new(&dt).unwrap(); + let inner = Decoder::try_new(&dt, false).unwrap(); let mut decoder = Decoder::Nullable( Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), @@ -1028,7 +1059,7 @@ mod tests { #[test] fn test_decimal_decoding_bytes_with_nulls_fixed_size() { let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16))); - let inner = Decoder::try_new(&dt).unwrap(); + let inner = Decoder::try_new(&dt, false).unwrap(); let mut decoder = Decoder::Nullable( Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), @@ -1066,7 +1097,7 @@ mod tests { fn test_enum_decoding() { let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect(); let avro_type = avro_from_codec(Codec::Enum(symbols.clone())); - let mut decoder = Decoder::try_new(&avro_type).unwrap(); + let mut decoder = Decoder::try_new(&avro_type, false).unwrap(); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_int(2)); data.extend_from_slice(&encode_avro_int(0)); @@ -1099,7 +1130,7 @@ mod tests { let enum_codec = Codec::Enum(symbols.clone()); let avro_type = AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst)); - let mut decoder = Decoder::try_new(&avro_type).unwrap(); + let mut decoder = Decoder::try_new(&avro_type, true).unwrap(); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_long(1)); data.extend_from_slice(&encode_avro_int(1)); @@ -1138,7 +1169,7 @@ mod tests { Default::default(), Some(Nullability::NullFirst), ); - let mut decoder = Decoder::try_new(&avro_type).unwrap(); + let mut decoder = Decoder::try_new(&avro_type, false).unwrap(); let mut data = Vec::new(); // First value: 1 month, 2 days, 3 millis data.extend_from_slice(&encode_avro_long(1)); // not null @@ -1188,7 +1219,7 @@ mod tests { fn test_duration_decoding_empty() { let duration_codec = Codec::Interval; let avro_type = AvroDataType::new(duration_codec, Default::default(), None); - let mut decoder = Decoder::try_new(&avro_type).unwrap(); + let mut decoder = Decoder::try_new(&avro_type, false).unwrap(); let array = decoder.flush(None).unwrap(); assert_eq!(array.len(), 0); } From 848de3b1ea27d0dd6245b9fd7c52f4e1045e45eb Mon Sep 17 00:00:00 2001 From: "veronica.manchola" Date: Thu, 17 Jul 2025 13:14:05 -0400 Subject: [PATCH 02/10] cargo format fixes --- arrow-avro/src/reader/mod.rs | 31 +++++++++++++++++++------------ arrow-avro/src/reader/record.rs | 31 +++++++++++-------------------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index d121ecc4f891..82a835296de8 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -397,6 +397,10 @@ mod test { use crate::reader::vlq::VLQDecoder; use crate::reader::{read_header, Decoder, Reader, ReaderBuilder}; use crate::test_util::arrow_test_data; + use arrow_array::builder::{ + Float64Builder, Int32Builder, ListBuilder, MapBuilder, StringBuilder, StructBuilder, + }; + use arrow_array::types::{Int32Type, IntervalMonthDayNanoType}; use arrow_array::*; use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema}; @@ -409,7 +413,6 @@ mod test { use std::io::{BufReader, Cursor, Read}; use std::sync::Arc; use std::task::{ready, Poll}; - use arrow_array::builder::{Float64Builder, Int32Builder, ListBuilder, MapBuilder, StringBuilder, StructBuilder}; fn read_file(path: &str, batch_size: usize, utf8_view: bool) -> RecordBatch { let file = File::open(path).unwrap(); @@ -423,7 +426,11 @@ mod test { arrow::compute::concat_batches(&schema, &batches).unwrap() } - fn read_file_strict(path: &str, batch_size: usize, utf8_view: bool) -> Result>, ArrowError> { + fn read_file_strict( + path: &str, + batch_size: usize, + utf8_view: bool, + ) -> Result>, ArrowError> { let file = File::open(path).unwrap(); ReaderBuilder::new() .with_batch_size(batch_size) @@ -432,7 +439,7 @@ mod test { .build(BufReader::new(file)) } - fn decode_stream + Unpin>( + fn decode_stream + Unpin>( mut decoder: Decoder, mut input: S, ) -> impl Stream> { @@ -953,7 +960,7 @@ mod test { Field::new("e", DataType::Int32, true), Field::new("f", DataType::Utf8, true), ] - .into(), + .into(), ), true, ))), @@ -961,7 +968,7 @@ mod test { ))), true, )] - .into(), + .into(), ), true, )), @@ -988,16 +995,16 @@ mod test { ))), true, )] - .into(), + .into(), ), true, )] - .into(), + .into(), ), true, ), ] - .into(), + .into(), ), false, )), @@ -1021,7 +1028,7 @@ mod test { Field::new("e", DataType::Int32, true), Field::new("f", DataType::Utf8, true), ] - .into(), + .into(), ), true, ))), @@ -1075,7 +1082,7 @@ mod test { ))), true, )] - .into(), + .into(), ), true, ))], @@ -1141,7 +1148,7 @@ mod test { ("int_map_array", Arc::new(int_map_array_), true), ("nested_Struct", Arc::new(nested_struct), true), ]) - .unwrap(); + .unwrap(); let batch_large = read_file(&file, 8, false); assert_eq!(batch_large, expected, "Mismatch for batch_size=8"); let batch_small = read_file(&file, 3, false); @@ -1222,7 +1229,7 @@ mod test { ); assert_eq!(a_array.value(6), 7, "Mismatch in nested_struct.A at row 6"); } - + #[test] fn test_nullable_impala_strict() { let file = arrow_test_data("avro/nullable.impala.avro"); diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 01990072484b..1321277152a1 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -360,7 +360,7 @@ impl Decoder { Self::Nullable(order, null_buffer, inner) => { null_buffer.append(false); inner.append_null(); - }, + } } } @@ -449,25 +449,16 @@ impl Decoder { } Self::Nullable(order, nb, child) => { let branch = buf.get_int()?; - match order { - Nullability::NullFirst => { - if branch == 0 { - nb.append(false); - child.append_null(); - } else { - nb.append(true); - child.decode(buf)?; - } - } - Nullability::NullSecond => { - if branch == 0 { - nb.append(true); - child.decode(buf)?; - } else { - nb.append(false); - child.append_null(); - } - } + let is_not_null = match order { + Nullability::NullFirst => branch != 0, + Nullability::NullSecond => branch == 0, + }; + + nb.append(is_not_null); + if is_not_null { + child.decode(buf)?; + } else { + child.append_null(); } } } From efd7e31296914a83edf212319c82daa50e9c6c91 Mon Sep 17 00:00:00 2001 From: "veronica.manchola" Date: Fri, 18 Jul 2025 13:34:24 -0400 Subject: [PATCH 03/10] add strict mode and utf8view options to AvroFieldBuilder and enhance Avro schema parsing --- arrow-avro/src/codec.rs | 132 ++++++++++++++++++++++++++++++----- arrow-avro/src/reader/mod.rs | 7 +- 2 files changed, 120 insertions(+), 19 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 88b30a6d49b4..008a5fadeeca 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -148,7 +148,7 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { match schema { Schema::Complex(ComplexType::Record(r)) => { let mut resolver = Resolver::default(); - let data_type = make_data_type(schema, None, &mut resolver, false)?; + let data_type = make_data_type(schema, None, &mut resolver, false, false)?; Ok(AvroField { data_type, name: r.name.to_string(), @@ -161,6 +161,66 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { } } +/// Builder for an [`AvroField`] +#[derive(Debug)] +pub struct AvroFieldBuilder<'a> { + schema: &'a Schema<'a>, + use_utf8view: bool, + strict_mode: bool, +} + +impl<'a> Default for AvroFieldBuilder<'a> { + fn default() -> Self { + panic!("AvroFieldBuilder requires a schema") + } +} + +impl<'a> AvroFieldBuilder<'a> { + /// Creates a new [`AvroFieldBuilder`] + pub fn new(schema: &'a Schema<'a>) -> Self { + Self { + schema, + use_utf8view: false, + strict_mode: false, + } + } + + /// Enable or disable Utf8View support + pub fn with_utf8view(mut self, use_utf8view: bool) -> Self { + self.use_utf8view = use_utf8view; + self + } + + /// Enable or disable strict mode. + pub fn with_strict_mode(mut self, strict_mode: bool) -> Self { + self.strict_mode = strict_mode; + self + } + + /// Build an [`AvroField`] from the builder + pub fn build(self) -> Result { + match self.schema { + Schema::Complex(ComplexType::Record(r)) => { + let mut resolver = Resolver::default(); + let data_type = make_data_type( + self.schema, + None, + &mut resolver, + self.use_utf8view, + self.strict_mode, + )?; + Ok(AvroField { + name: r.name.to_string(), + data_type, + }) + } + _ => Err(ArrowError::ParseError(format!( + "Expected a Record schema to build an AvroField, but got {:?}", + self.schema + ))), + } + } +} /// An Avro encoding /// /// @@ -409,6 +469,7 @@ fn make_data_type<'a>( namespace: Option<&'a str>, resolver: &mut Resolver<'a>, use_utf8view: bool, + strict_mode: bool, ) -> Result { match schema { Schema::TypeName(TypeName::Primitive(p)) => { @@ -428,12 +489,20 @@ fn make_data_type<'a>( .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))); match (f.len() == 2, null) { (true, Some(0)) => { - let mut field = make_data_type(&f[1], namespace, resolver, use_utf8view)?; + let mut field = + make_data_type(&f[1], namespace, resolver, use_utf8view, strict_mode)?; field.nullability = Some(Nullability::NullFirst); Ok(field) } (true, Some(1)) => { - let mut field = make_data_type(&f[0], namespace, resolver, use_utf8view)?; + if strict_mode { + return Err(ArrowError::SchemaError( + "Found Avro union of the form ['T','null'], which is disallowed in strict_mode" + .to_string(), + )); + } + let mut field = + make_data_type(&f[0], namespace, resolver, use_utf8view, strict_mode)?; field.nullability = Some(Nullability::NullSecond); Ok(field) } @@ -456,6 +525,7 @@ fn make_data_type<'a>( namespace, resolver, use_utf8view, + strict_mode, )?, }) }) @@ -469,8 +539,13 @@ fn make_data_type<'a>( Ok(field) } ComplexType::Array(a) => { - let mut field = - make_data_type(a.items.as_ref(), namespace, resolver, use_utf8view)?; + let mut field = make_data_type( + a.items.as_ref(), + namespace, + resolver, + use_utf8view, + strict_mode, + )?; Ok(AvroDataType { nullability: None, metadata: a.attributes.field_metadata(), @@ -535,7 +610,8 @@ fn make_data_type<'a>( Ok(field) } ComplexType::Map(m) => { - let val = make_data_type(&m.values, namespace, resolver, use_utf8view)?; + let val = + make_data_type(&m.values, namespace, resolver, use_utf8view, strict_mode)?; Ok(AvroDataType { nullability: None, metadata: m.attributes.field_metadata(), @@ -549,6 +625,7 @@ fn make_data_type<'a>( namespace, resolver, use_utf8view, + strict_mode, )?; // https://avro.apache.org/docs/1.11.1/specification/#logical-types @@ -630,7 +707,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Int, "date"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::Date32)); } @@ -640,7 +717,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::TimeMillis)); } @@ -650,7 +727,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::TimeMicros)); } @@ -660,7 +737,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::TimestampMillis(true))); } @@ -670,7 +747,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::TimestampMicros(true))); } @@ -680,7 +757,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::TimestampMillis(false))); } @@ -690,7 +767,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::TimestampMicros(false))); } @@ -745,7 +822,7 @@ mod tests { let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type"); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert_eq!( result.metadata.get("logicalType"), @@ -758,7 +835,7 @@ mod tests { let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, true).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, true, false).unwrap(); assert!(matches!(result.codec, Codec::Utf8View)); } @@ -768,7 +845,7 @@ mod tests { let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap(); assert!(matches!(result.codec, Codec::Utf8)); } @@ -796,7 +873,7 @@ mod tests { let schema = Schema::Complex(ComplexType::Record(record)); let mut resolver = Resolver::default(); - let result = make_data_type(&schema, None, &mut resolver, true).unwrap(); + let result = make_data_type(&schema, None, &mut resolver, true, false).unwrap(); if let Codec::Struct(fields) = &result.codec { let first_field_codec = &fields[0].data_type().codec; @@ -805,4 +882,25 @@ mod tests { panic!("Expected Struct codec"); } } + + #[test] + fn test_union_with_strict_mode() { + let schema = Schema::Union(vec![ + Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), + Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), + ]); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false, true); + + assert!(result.is_err()); + match result { + Err(ArrowError::SchemaError(msg)) => { + assert!(msg.contains( + "Found Avro union of the form ['T','null'], which is disallowed in strict_mode" + )); + } + _ => panic!("Expected SchemaError"), + } + } } diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 82a835296de8..6dc6800debe7 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -86,7 +86,7 @@ //! ``` //! -use crate::codec::AvroField; +use crate::codec::AvroFieldBuilder; use crate::schema::Schema as AvroSchema; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, SchemaRef}; @@ -221,7 +221,10 @@ impl ReaderBuilder { } fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result { - let root_field = AvroField::try_from(schema)?; + let root_field = AvroFieldBuilder::new(schema) + .with_utf8view(self.utf8_view) + .with_strict_mode(self.strict_mode) + .build()?; RecordDecoder::try_new_with_options( root_field.data_type(), self.utf8_view, From 15b58e3e015d119700b15ba0bb66917f97359f34 Mon Sep 17 00:00:00 2001 From: "veronica.manchola" Date: Fri, 18 Jul 2025 13:38:31 -0400 Subject: [PATCH 04/10] remove unnecesary check --- arrow-avro/src/reader/record.rs | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 1321277152a1..e6e6aa5260c3 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -301,23 +301,9 @@ impl Decoder { } Codec::Uuid => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)), }; - let union_order = match data_type.nullability() { - None => None, - Some(Nullability::NullFirst) => Some(Nullability::NullFirst), - Some(Nullability::NullSecond) => { - if strict_mode { - return Err(ArrowError::ParseError( - "Found Avro union of the form ['T','null'], which is disallowed in strict_mode" - .to_string(), - )); - } - Some(Nullability::NullSecond) - } - }; - - Ok(match union_order { - Some(order) => Decoder::Nullable( - order, + Ok(match data_type.nullability() { + Some(nullability) => Self::Nullable( + nullability, NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(decoder), ), From 6b092a131ba05d5b840579167ac37b170be2dbcb Mon Sep 17 00:00:00 2001 From: "veronica.manchola" Date: Fri, 18 Jul 2025 13:48:39 -0400 Subject: [PATCH 05/10] remove strict_mode option from RecordDecoder and related methods --- arrow-avro/src/reader/mod.rs | 6 +--- arrow-avro/src/reader/record.rs | 57 +++++++++++++-------------------- 2 files changed, 24 insertions(+), 39 deletions(-) diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 6dc6800debe7..116e04b70310 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -225,11 +225,7 @@ impl ReaderBuilder { .with_utf8view(self.utf8_view) .with_strict_mode(self.strict_mode) .build()?; - RecordDecoder::try_new_with_options( - root_field.data_type(), - self.utf8_view, - self.strict_mode, - ) + RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) } fn build_impl(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index e6e6aa5260c3..695bdf491af7 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -43,7 +43,6 @@ const DEFAULT_CAPACITY: usize = 1024; pub(crate) struct RecordDecoderBuilder<'a> { data_type: &'a AvroDataType, use_utf8view: bool, - strict_mode: bool, } impl<'a> RecordDecoderBuilder<'a> { @@ -51,7 +50,6 @@ impl<'a> RecordDecoderBuilder<'a> { Self { data_type, use_utf8view: false, - strict_mode: false, } } @@ -60,14 +58,9 @@ impl<'a> RecordDecoderBuilder<'a> { self } - pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self { - self.strict_mode = strict_mode; - self - } - /// Builds the `RecordDecoder`. pub(crate) fn build(self) -> Result { - RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view, self.strict_mode) + RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view) } } @@ -77,7 +70,6 @@ pub(crate) struct RecordDecoder { schema: SchemaRef, fields: Vec, use_utf8view: bool, - strict_mode: bool, } impl RecordDecoder { @@ -90,7 +82,6 @@ impl RecordDecoder { pub(crate) fn try_new(data_type: &AvroDataType) -> Result { RecordDecoderBuilder::new(data_type) .with_utf8_view(true) - .with_strict_mode(true) .build() } @@ -109,14 +100,12 @@ impl RecordDecoder { pub(crate) fn try_new_with_options( data_type: &AvroDataType, use_utf8view: bool, - strict_mode: bool, ) -> Result { - match Decoder::try_new(data_type, strict_mode)? { + match Decoder::try_new(data_type)? { Decoder::Record(fields, encodings) => Ok(Self { schema: Arc::new(ArrowSchema::new(fields)), fields: encodings, use_utf8view, - strict_mode, }), encoding => Err(ArrowError::ParseError(format!( "Expected record got {encoding:?}" @@ -189,7 +178,7 @@ enum Decoder { } impl Decoder { - fn try_new(data_type: &AvroDataType, strict_mode: bool) -> Result { + fn try_new(data_type: &AvroDataType) -> Result { let decoder = match data_type.codec() { Codec::Null => Self::Null(0), Codec::Boolean => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)), @@ -260,7 +249,7 @@ impl Decoder { } Codec::Interval => Self::Duration(IntervalMonthDayNanoBuilder::new()), Codec::List(item) => { - let decoder = Self::try_new(item, strict_mode)?; + let decoder = Self::try_new(item)?; Self::Array( Arc::new(item.field_with_name("item")), OffsetBufferBuilder::new(DEFAULT_CAPACITY), @@ -274,7 +263,7 @@ impl Decoder { let mut arrow_fields = Vec::with_capacity(fields.len()); let mut encodings = Vec::with_capacity(fields.len()); for avro_field in fields.iter() { - let encoding = Self::try_new(avro_field.data_type(), strict_mode)?; + let encoding = Self::try_new(avro_field.data_type())?; arrow_fields.push(avro_field.field()); encodings.push(encoding); } @@ -290,7 +279,7 @@ impl Decoder { ])), false, )); - let val_dec = Self::try_new(child, strict_mode)?; + let val_dec = Self::try_new(child)?; Self::Map( map_field, OffsetBufferBuilder::new(DEFAULT_CAPACITY), @@ -725,7 +714,7 @@ mod tests { fn test_map_decoding_one_entry() { let value_type = avro_from_codec(Codec::Utf8); let map_type = avro_from_codec(Codec::Map(Arc::new(value_type))); - let mut decoder = Decoder::try_new(&map_type, false).unwrap(); + let mut decoder = Decoder::try_new(&map_type).unwrap(); // Encode a single map with one entry: {"hello": "world"} let mut data = Vec::new(); data.extend_from_slice(&encode_avro_long(1)); @@ -761,7 +750,7 @@ mod tests { fn test_map_decoding_empty() { let value_type = avro_from_codec(Codec::Utf8); let map_type = avro_from_codec(Codec::Map(Arc::new(value_type))); - let mut decoder = Decoder::try_new(&map_type, false).unwrap(); + let mut decoder = Decoder::try_new(&map_type).unwrap(); let data = encode_avro_long(0); decoder.decode(&mut AvroCursor::new(&data)).unwrap(); let array = decoder.flush(None).unwrap(); @@ -773,7 +762,7 @@ mod tests { #[test] fn test_fixed_decoding() { let avro_type = avro_from_codec(Codec::Fixed(3)); - let mut decoder = Decoder::try_new(&avro_type, false).expect("Failed to create decoder"); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); let data1 = [1u8, 2, 3]; let mut cursor1 = AvroCursor::new(&data1); @@ -813,7 +802,7 @@ mod tests { #[test] fn test_fixed_decoding_empty() { let avro_type = avro_from_codec(Codec::Fixed(5)); - let mut decoder = Decoder::try_new(&avro_type, false).expect("Failed to create decoder"); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); let array = decoder .flush(None) @@ -835,7 +824,7 @@ mod tests { #[test] fn test_uuid_decoding() { let avro_type = avro_from_codec(Codec::Uuid); - let mut decoder = Decoder::try_new(&avro_type, false).expect("Failed to create decoder"); + let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder"); let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6"; let data = encode_avro_bytes(uuid_str.as_bytes()); let mut cursor = AvroCursor::new(&data); @@ -863,7 +852,7 @@ mod tests { fn test_array_decoding() { let item_dt = avro_from_codec(Codec::Int32); let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt))); - let mut decoder = Decoder::try_new(&list_dt, false).unwrap(); + let mut decoder = Decoder::try_new(&list_dt).unwrap(); let mut row1 = Vec::new(); row1.extend_from_slice(&encode_avro_long(2)); row1.extend_from_slice(&encode_avro_int(10)); @@ -890,7 +879,7 @@ mod tests { fn test_array_decoding_with_negative_block_count() { let item_dt = avro_from_codec(Codec::Int32); let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt))); - let mut decoder = Decoder::try_new(&list_dt, false).unwrap(); + let mut decoder = Decoder::try_new(&list_dt).unwrap(); let mut data = encode_avro_long(-3); data.extend_from_slice(&encode_avro_long(12)); data.extend_from_slice(&encode_avro_int(1)); @@ -914,7 +903,7 @@ mod tests { fn test_nested_array_decoding() { let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32)))); let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone()))); - let mut decoder = Decoder::try_new(&nested_ty, false).unwrap(); + let mut decoder = Decoder::try_new(&nested_ty).unwrap(); let mut buf = Vec::new(); buf.extend(encode_avro_long(1)); buf.extend(encode_avro_long(2)); @@ -943,7 +932,7 @@ mod tests { fn test_array_decoding_empty_array() { let value_type = avro_from_codec(Codec::Utf8); let map_type = avro_from_codec(Codec::List(Arc::new(value_type))); - let mut decoder = Decoder::try_new(&map_type, false).unwrap(); + let mut decoder = Decoder::try_new(&map_type).unwrap(); let data = encode_avro_long(0); decoder.decode(&mut AvroCursor::new(&data)).unwrap(); let array = decoder.flush(None).unwrap(); @@ -955,7 +944,7 @@ mod tests { #[test] fn test_decimal_decoding_fixed256() { let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32))); - let mut decoder = Decoder::try_new(&dt, false).unwrap(); + let mut decoder = Decoder::try_new(&dt).unwrap(); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -982,7 +971,7 @@ mod tests { #[test] fn test_decimal_decoding_fixed128() { let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16))); - let mut decoder = Decoder::try_new(&dt, false).unwrap(); + let mut decoder = Decoder::try_new(&dt).unwrap(); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x39, @@ -1007,7 +996,7 @@ mod tests { #[test] fn test_decimal_decoding_bytes_with_nulls() { let dt = avro_from_codec(Codec::Decimal(4, Some(1), None)); - let inner = Decoder::try_new(&dt, false).unwrap(); + let inner = Decoder::try_new(&dt).unwrap(); let mut decoder = Decoder::Nullable( Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), @@ -1036,7 +1025,7 @@ mod tests { #[test] fn test_decimal_decoding_bytes_with_nulls_fixed_size() { let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16))); - let inner = Decoder::try_new(&dt, false).unwrap(); + let inner = Decoder::try_new(&dt).unwrap(); let mut decoder = Decoder::Nullable( Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), @@ -1074,7 +1063,7 @@ mod tests { fn test_enum_decoding() { let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect(); let avro_type = avro_from_codec(Codec::Enum(symbols.clone())); - let mut decoder = Decoder::try_new(&avro_type, false).unwrap(); + let mut decoder = Decoder::try_new(&avro_type).unwrap(); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_int(2)); data.extend_from_slice(&encode_avro_int(0)); @@ -1107,7 +1096,7 @@ mod tests { let enum_codec = Codec::Enum(symbols.clone()); let avro_type = AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst)); - let mut decoder = Decoder::try_new(&avro_type, true).unwrap(); + let mut decoder = Decoder::try_new(&avro_type).unwrap(); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_long(1)); data.extend_from_slice(&encode_avro_int(1)); @@ -1146,7 +1135,7 @@ mod tests { Default::default(), Some(Nullability::NullFirst), ); - let mut decoder = Decoder::try_new(&avro_type, false).unwrap(); + let mut decoder = Decoder::try_new(&avro_type).unwrap(); let mut data = Vec::new(); // First value: 1 month, 2 days, 3 millis data.extend_from_slice(&encode_avro_long(1)); // not null @@ -1196,7 +1185,7 @@ mod tests { fn test_duration_decoding_empty() { let duration_codec = Codec::Interval; let avro_type = AvroDataType::new(duration_codec, Default::default(), None); - let mut decoder = Decoder::try_new(&avro_type, false).unwrap(); + let mut decoder = Decoder::try_new(&avro_type).unwrap(); let array = decoder.flush(None).unwrap(); assert_eq!(array.len(), 0); } From 55a9c4eb13117a4eeda27a1771e5b04582abb8ea Mon Sep 17 00:00:00 2001 From: Veronica Manchola Date: Fri, 18 Jul 2025 15:14:12 -0400 Subject: [PATCH 06/10] Update arrow-avro/src/reader/record.rs Co-authored-by: Connor Sanders --- arrow-avro/src/reader/record.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 695bdf491af7..023a0756bb60 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -422,18 +422,17 @@ impl Decoder { let nanos = (millis as i64) * 1_000_000; builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos)); } - Self::Nullable(order, nb, child) => { - let branch = buf.get_int()?; - let is_not_null = match order { + Self::Nullable(order, nb, encoding) => { + let branch = buf.read_vlq()?; + let is_not_null = match *order { Nullability::NullFirst => branch != 0, Nullability::NullSecond => branch == 0, }; - nb.append(is_not_null); if is_not_null { - child.decode(buf)?; + encoding.decode(buf)?; } else { - child.append_null(); + encoding.append_null(); } } } From aded7ee379ee3dddd18abf02a9a2ea8e4ce5fcb7 Mon Sep 17 00:00:00 2001 From: Veronica Manchola Date: Fri, 18 Jul 2025 15:15:18 -0400 Subject: [PATCH 07/10] Update arrow-avro/src/reader/record.rs Co-authored-by: Connor Sanders --- arrow-avro/src/reader/record.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 023a0756bb60..180afcd2d8c3 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -332,7 +332,7 @@ impl Decoder { Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO), Self::Enum(indices, _) => indices.push(0), Self::Duration(builder) => builder.append_null(), - Self::Nullable(order, null_buffer, inner) => { + Self::Nullable(_, null_buffer, inner) => { null_buffer.append(false); inner.append_null(); } From 87cd5cb077e05319173a1fbf4419ef0ef9f6bb9e Mon Sep 17 00:00:00 2001 From: Veronica Manchola Date: Fri, 18 Jul 2025 15:33:33 -0400 Subject: [PATCH 08/10] Update arrow-avro/src/codec.rs Co-authored-by: Connor Sanders --- arrow-avro/src/codec.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 008a5fadeeca..f749399f99d3 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -169,11 +169,6 @@ pub struct AvroFieldBuilder<'a> { strict_mode: bool, } -impl<'a> Default for AvroFieldBuilder<'a> { - fn default() -> Self { - panic!("AvroFieldBuilder requires a schema") - } -} impl<'a> AvroFieldBuilder<'a> { /// Creates a new [`AvroFieldBuilder`] From 654dc9b101d44f7a7b4b9fbfb590e2500110d804 Mon Sep 17 00:00:00 2001 From: Connor Sanders Date: Fri, 18 Jul 2025 14:45:17 -0500 Subject: [PATCH 09/10] lint --- arrow-avro/src/codec.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index f749399f99d3..bd265503d755 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -169,7 +169,6 @@ pub struct AvroFieldBuilder<'a> { strict_mode: bool, } - impl<'a> AvroFieldBuilder<'a> { /// Creates a new [`AvroFieldBuilder`] pub fn new(schema: &'a Schema<'a>) -> Self { From 0283d63e2e1bc075a80a07d3414eff66c1952ee1 Mon Sep 17 00:00:00 2001 From: "veronica.manchola" Date: Fri, 18 Jul 2025 16:58:25 -0400 Subject: [PATCH 10/10] address clippy error --- arrow-avro/src/reader/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 116e04b70310..3bc7d94b7c4c 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -1178,12 +1178,7 @@ mod test { .expect("id column should be an Int64Array"); let expected_ids = [1, 2, 3, 4, 5, 6, 7]; for (i, &expected_id) in expected_ids.iter().enumerate() { - assert_eq!( - id_array.value(i), - expected_id, - "Mismatch in id at row {}", - i - ); + assert_eq!(id_array.value(i), expected_id, "Mismatch in id at row {i}",); } let int_array = batch .column(1)