diff --git a/arrow-json/src/reader/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs index 9094391cd7dd..4e933fd877be 100644 --- a/arrow-json/src/reader/boolean_array.rs +++ b/arrow-json/src/reader/boolean_array.rs @@ -24,7 +24,16 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::ArrayDecoder; #[derive(Default)] -pub struct BooleanArrayDecoder {} +pub struct BooleanArrayDecoder { + ignore_type_conflicts: bool, +} +impl BooleanArrayDecoder { + pub fn new(ignore_type_conflicts: bool) -> Self { + Self { + ignore_type_conflicts, + } + } +} impl ArrayDecoder for BooleanArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { @@ -34,6 +43,7 @@ impl ArrayDecoder for BooleanArrayDecoder { TapeElement::Null => builder.append_null(), TapeElement::True => builder.append_value(true), TapeElement::False => builder.append_value(false), + _ if self.ignore_type_conflicts => builder.append_null(), _ => return Err(tape.error(*p, "boolean")), } } diff --git a/arrow-json/src/reader/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs index d56afcfe807a..83b12365e64a 100644 --- a/arrow-json/src/reader/decimal_array.rs +++ b/arrow-json/src/reader/decimal_array.rs @@ -30,15 +30,17 @@ use crate::reader::ArrayDecoder; pub struct DecimalArrayDecoder { precision: u8, scale: i8, + ignore_type_conflicts: bool, // Invariant and Send phantom: PhantomData D>, } impl DecimalArrayDecoder { - pub fn new(precision: u8, scale: i8) -> Self { + pub fn new(precision: u8, scale: i8, ignore_type_conflicts: bool) -> Self { Self { precision, scale, + ignore_type_conflicts, phantom: PhantomData, } } @@ -51,45 +53,40 @@ where fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::::with_capacity(pos.len()); + // Factor out this logic to simplify call sites below; the compiler will inline it, + // producing a highly predictable branch whose cost should be trivial compared to the + // expensive and unpredictably branchy string parse that immediately precedes each call. + let append = |builder: &mut PrimitiveBuilder, value: &str| { + match parse_decimal::(value, self.precision, self.scale) { + Ok(value) => builder.append_value(value), + Err(_) if self.ignore_type_conflicts => builder.append_null(), + Err(e) => return Err(e), + } + Ok(()) + }; + for p in pos { match tape.get(*p) { TapeElement::Null => builder.append_null(), - TapeElement::String(idx) => { - let s = tape.get_string(idx); - let value = parse_decimal::(s, self.precision, self.scale)?; - builder.append_value(value) - } - TapeElement::Number(idx) => { - let s = tape.get_string(idx); - let value = parse_decimal::(s, self.precision, self.scale)?; - builder.append_value(value) - } + TapeElement::String(idx) => append(&mut builder, tape.get_string(idx))?, + TapeElement::Number(idx) => append(&mut builder, tape.get_string(idx))?, TapeElement::I64(high) => match tape.get(*p + 1) { TapeElement::I32(low) => { let val = (((high as i64) << 32) | (low as u32) as i64).to_string(); - let value = parse_decimal::(&val, self.precision, self.scale)?; - builder.append_value(value) + append(&mut builder, &val)? } _ => unreachable!(), }, - TapeElement::I32(val) => { - let s = val.to_string(); - let value = parse_decimal::(&s, self.precision, self.scale)?; - builder.append_value(value) - } + TapeElement::I32(val) => append(&mut builder, &val.to_string())?, TapeElement::F64(high) => match tape.get(*p + 1) { TapeElement::F32(low) => { let val = f64::from_bits(((high as u64) << 32) | low as u64).to_string(); - let value = parse_decimal::(&val, self.precision, self.scale)?; - builder.append_value(value) + append(&mut builder, &val)? } _ => unreachable!(), }, - TapeElement::F32(val) => { - let s = f32::from_bits(val).to_string(); - let value = parse_decimal::(&s, self.precision, self.scale)?; - builder.append_value(value) - } + TapeElement::F32(val) => append(&mut builder, &f32::from_bits(val).to_string())?, + _ if self.ignore_type_conflicts => builder.append_null(), _ => return Err(tape.error(*p, "decimal")), } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index 1a1dee6a23d4..7c05cffa6c76 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -29,6 +29,7 @@ pub struct ListArrayDecoder { data_type: DataType, decoder: Box, phantom: PhantomData, + ignore_type_conflicts: bool, is_nullable: bool, } @@ -37,6 +38,7 @@ impl ListArrayDecoder { data_type: DataType, coerce_primitive: bool, strict_mode: bool, + ignore_type_conflicts: bool, is_nullable: bool, struct_mode: StructMode, ) -> Result { @@ -49,6 +51,7 @@ impl ListArrayDecoder { field.data_type().clone(), coerce_primitive, strict_mode, + ignore_type_conflicts, field.is_nullable(), struct_mode, )?; @@ -57,6 +60,7 @@ impl ListArrayDecoder { data_type, decoder, phantom: Default::default(), + ignore_type_conflicts, is_nullable, }) } @@ -83,6 +87,10 @@ impl ArrayDecoder for ListArrayDecoder { nulls.append(false); *p + 1 } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + *p + 1 + } _ => return Err(tape.error(*p, "[")), }; diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index ee78373a551e..b9b131ac134f 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -28,6 +28,7 @@ pub struct MapArrayDecoder { data_type: DataType, keys: Box, values: Box, + ignore_type_conflicts: bool, is_nullable: bool, } @@ -36,6 +37,7 @@ impl MapArrayDecoder { data_type: DataType, coerce_primitive: bool, strict_mode: bool, + ignore_type_conflicts: bool, is_nullable: bool, struct_mode: StructMode, ) -> Result { @@ -60,6 +62,7 @@ impl MapArrayDecoder { fields[0].data_type().clone(), coerce_primitive, strict_mode, + ignore_type_conflicts, fields[0].is_nullable(), struct_mode, )?; @@ -67,6 +70,7 @@ impl MapArrayDecoder { fields[1].data_type().clone(), coerce_primitive, strict_mode, + ignore_type_conflicts, fields[1].is_nullable(), struct_mode, )?; @@ -75,6 +79,7 @@ impl MapArrayDecoder { data_type, keys, values, + ignore_type_conflicts, is_nullable, }) } @@ -111,6 +116,10 @@ impl ArrayDecoder for MapArrayDecoder { nulls.append(false); p + 1 } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + p + 1 + } _ => return Err(tape.error(p, "{")), }; diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index cd33e337be08..1938bd7e818d 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -178,6 +178,7 @@ pub struct ReaderBuilder { batch_size: usize, coerce_primitive: bool, strict_mode: bool, + ignore_type_conflicts: bool, is_field: bool, struct_mode: StructMode, @@ -198,6 +199,7 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + ignore_type_conflicts: false, is_field: false, struct_mode: Default::default(), schema, @@ -239,6 +241,7 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + ignore_type_conflicts: false, is_field: true, struct_mode: Default::default(), schema: Arc::new(Schema::new([field.into()])), @@ -281,6 +284,18 @@ impl ReaderBuilder { } } + /// Sets whether the decoder should produce NULL instead of returning an error if it encounters + /// a type conflict on a nullable column (effectively treating it as a non-existent column). + /// + /// NOTE: The inferred NULL on type conflict will still produce errors for non-nullable columns, + /// the same as any other NULL or missing value. + pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self { + Self { + ignore_type_conflicts, + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -303,6 +318,7 @@ impl ReaderBuilder { data_type, self.coerce_primitive, self.strict_mode, + self.ignore_type_conflicts, nullable, self.struct_mode, )?; @@ -674,8 +690,11 @@ trait ArrayDecoder: Send { } macro_rules! primitive_decoder { - ($t:ty, $data_type:expr) => { - Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type))) + ($t:ty, $data_type:expr, $ignore_type_conflicts:expr) => { + Ok(Box::new(PrimitiveArrayDecoder::<$t>::new( + $data_type, + $ignore_type_conflicts, + ))) }; } @@ -683,82 +702,85 @@ fn make_decoder( data_type: DataType, coerce_primitive: bool, strict_mode: bool, + ignore_type_conflicts: bool, is_nullable: bool, struct_mode: StructMode, ) -> Result, ArrowError> { downcast_integer! { - data_type => (primitive_decoder, data_type), - DataType::Null => Ok(Box::::default()), - DataType::Float16 => primitive_decoder!(Float16Type, data_type), - DataType::Float32 => primitive_decoder!(Float32Type, data_type), - DataType::Float64 => primitive_decoder!(Float64Type, data_type), + data_type => (primitive_decoder, data_type, ignore_type_conflicts), + DataType::Null => Ok(Box::new(NullArrayDecoder::new(ignore_type_conflicts))), + DataType::Float16 => primitive_decoder!(Float16Type, data_type, ignore_type_conflicts), + DataType::Float32 => primitive_decoder!(Float32Type, data_type, ignore_type_conflicts), + DataType::Float64 => primitive_decoder!(Float64Type, data_type, ignore_type_conflicts), DataType::Timestamp(TimeUnit::Second, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Millisecond, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Microsecond, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Nanosecond, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz, ignore_type_conflicts))) }, DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz, ignore_type_conflicts))) }, - DataType::Date32 => primitive_decoder!(Date32Type, data_type), - DataType::Date64 => primitive_decoder!(Date64Type, data_type), - DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type), - DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type), - DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type), - DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type), - DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type), - DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type), - DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type), - DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type), - DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), - DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), - DataType::Boolean => Ok(Box::::default()), - DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))), - DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), - DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), - DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::Date32 => primitive_decoder!(Date32Type, data_type, ignore_type_conflicts), + DataType::Date64 => primitive_decoder!(Date64Type, data_type, ignore_type_conflicts), + DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type, ignore_type_conflicts), + DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type, ignore_type_conflicts), + DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type, ignore_type_conflicts), + DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type, ignore_type_conflicts), + DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type, ignore_type_conflicts), + DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type, ignore_type_conflicts), + DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type, ignore_type_conflicts), + DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type, ignore_type_conflicts), + DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s, ignore_type_conflicts))), + DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s, ignore_type_conflicts))), + DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ignore_type_conflicts))), + DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive, ignore_type_conflicts))), + DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive, ignore_type_conflicts))), + DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive, ignore_type_conflicts))), + DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, ignore_type_conflicts, is_nullable, struct_mode)?)), + DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, ignore_type_conflicts, is_nullable, struct_mode)?)), + DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, ignore_type_conflicts, is_nullable, struct_mode)?)), DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => { Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON"))) } - DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, ignore_type_conflicts, is_nullable, struct_mode)?)), d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) } } #[cfg(test)] mod tests { - use serde_json::json; - use std::fs::File; - use std::io::{BufReader, Cursor, Seek}; - use arrow_array::cast::AsArray; - use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray}; - use arrow_buffer::{ArrowNativeType, Buffer}; + use arrow_array::{ + Array, BooleanArray, Float64Array, Int32Array, ListArray, MapArray, NullArray, StringArray, + StringViewArray, + }; + use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; use arrow_schema::{Field, Fields}; + use serde_json::json; + use std::fs::File; + use std::io::{BufReader, Cursor, Seek}; use super::*; @@ -2808,4 +2830,354 @@ mod tests { "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() ); } + + #[test] + fn test_type_conflict_nulls() { + let schema = Schema::new(vec![ + Field::new("null", DataType::Null, true), + Field::new("bool", DataType::Boolean, true), + Field::new("primitive", DataType::Int32, true), + Field::new("numeric", DataType::Decimal128(10, 3), true), + Field::new("string", DataType::Utf8, true), + Field::new("string_view", DataType::Utf8View, true), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new( + "map", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ])), + false, // not nullable + )), + false, // not sorted + ), + true, // nullable + ), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])), + true, + ), + ]); + + // A compatible value for each schema field above, in schema order + let json_values = vec![ + json!(null), + json!(true), + json!(42), + json!(1.234), + json!("hi"), + json!("ho"), + json!("1970-01-01T00:00:00+02:00"), + json!([1, "ho", 3]), + json!({"k": "value"}), + json!({"a": 1}), + ]; + + // Create a set of JSON rows that rotates each value past every field + let json: Vec<_> = (0..json_values.len()) + .map(|i| { + let pairs = json_values[i..] + .iter() + .chain(json_values[..i].iter()) + .zip(&schema.fields) + .map(|(v, f)| (f.name().to_string(), v.clone())) + .collect(); + serde_json::Value::Object(pairs) + }) + .collect(); + let mut decoder = ReaderBuilder::new(Arc::new(schema)) + .with_ignore_type_conflicts(true) + .with_coerce_primitive(true) + .build_decoder() + .unwrap(); + decoder.serialize(&json).unwrap(); + let batch = decoder.flush().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 10); + assert_eq!(batch.num_columns(), 10); + + // NOTE: NullArray doesn't materialize any values (they're all NULL by definition) + let _ = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .eq([ + Some(true), + None, + None, + None, + None, + None, + None, + None, + None, + None + ])); + + assert!(batch.column(2).as_primitive::().iter().eq([ + Some(42), + Some(1), + None, + None, + None, + None, + None, + None, + None, + None + ])); + + assert!(batch.column(3).as_primitive::().iter().eq([ + Some(1234), + None, + None, + None, + None, + None, + None, + None, + None, + Some(42000) + ])); + + assert!(batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .eq([ + Some("hi"), + Some("ho"), + Some("1970-01-01T00:00:00+02:00"), + None, + None, + None, + None, + Some("true"), + Some("42"), + Some("1.234"), + ])); + + assert!(batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .eq([ + Some("ho"), + Some("1970-01-01T00:00:00+02:00"), + None, + None, + None, + None, + Some("true"), + Some("42"), + Some("1.234"), + Some("hi"), + ])); + + assert!(batch + .column(6) + .as_primitive::() + .iter() + .eq([ + Some(-7200), + None, + None, + None, + None, + None, + Some(42), + None, + None, + None, + ])); + + let arrays = batch + .column(7) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + arrays.nulls(), + Some(&NullBuffer::from( + &[true, false, false, false, false, false, false, false, false, false][..] + )) + ); + assert_eq!(arrays.offsets()[1], 3); + let array_values = arrays + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(array_values.iter().eq([Some(1), None, Some(3)])); + + let maps = batch.column(8).as_any().downcast_ref::().unwrap(); + assert_eq!( + maps.nulls(), + Some(&NullBuffer::from( + // Both map and struct can parse + &[true, true, false, false, false, false, false, false, false, false][..] + )) + ); + let map_keys = maps.keys().as_any().downcast_ref::().unwrap(); + assert!(map_keys.iter().eq([Some("k"), Some("a")])); + let map_values = maps + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(map_values.iter().eq([Some("value"), Some("1")])); + + let structs = batch + .column(9) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + structs.nulls(), + Some(&NullBuffer::from( + // Both map and struct can parse + &[true, false, false, false, false, false, false, false, false, true][..] + )) + ); + let struct_fields = structs + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None])); + } + + #[test] + fn test_type_conflict_non_nullable() { + let fields = [ + Field::new("bool", DataType::Boolean, false), + Field::new("primitive", DataType::Int32, false), + Field::new("numeric", DataType::Decimal128(10, 3), false), + Field::new("string", DataType::Utf8, false), + Field::new("string_view", DataType::Utf8View, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Second, None), + false, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + false, + ), + Field::new( + "map", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ])), + false, // not nullable + )), + false, // not sorted + ), + false, // not nullable + ), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])), + false, + ), + ]; + + // Every field above will have a type conflict with at least one of these values + let json_values = vec![json!(true), json!({"a": 1})]; + + for field in fields { + let mut decoder = ReaderBuilder::new_with_field(field) + .with_ignore_type_conflicts(true) + .build_decoder() + .unwrap(); + decoder.serialize(&json_values).unwrap(); + decoder + .flush() + .expect_err("type conflict on non-nullable type"); + } + } + + #[test] + fn test_ignore_type_conflicts_disabled() { + let fields = [ + Field::new("null", DataType::Null, true), + Field::new("bool", DataType::Boolean, true), + Field::new("primitive", DataType::Int32, true), + Field::new("numeric", DataType::Decimal128(10, 3), true), + Field::new("string", DataType::Utf8, true), + Field::new("string_view", DataType::Utf8View, true), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new( + "map", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ])), + false, // not nullable + )), + false, // not sorted + ), + true, // not nullable + ), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])), + true, + ), + ]; + + // Every field above will have a type conflict with at least one of these values + let json_values = vec![json!(true), json!({"a": 1})]; + + for field in fields { + let mut decoder = ReaderBuilder::new_with_field(field) + .build_decoder() + .unwrap(); + decoder.serialize(&json_values).unwrap(); + decoder + .flush() + .expect_err("type conflict on non-nullable type"); + } + } } diff --git a/arrow-json/src/reader/null_array.rs b/arrow-json/src/reader/null_array.rs index 4270045fb3c2..b931d7ec519f 100644 --- a/arrow-json/src/reader/null_array.rs +++ b/arrow-json/src/reader/null_array.rs @@ -21,13 +21,24 @@ use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; #[derive(Default)] -pub struct NullArrayDecoder {} +pub struct NullArrayDecoder { + ignore_type_conflicts: bool, +} +impl NullArrayDecoder { + pub fn new(ignore_type_conflicts: bool) -> Self { + Self { + ignore_type_conflicts, + } + } +} impl ArrayDecoder for NullArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - for p in pos { - if !matches!(tape.get(*p), TapeElement::Null) { - return Err(tape.error(*p, "null")); + if !self.ignore_type_conflicts { + for p in pos { + if !matches!(tape.get(*p), TapeElement::Null) { + return Err(tape.error(*p, "null")); + } } } ArrayDataBuilder::new(DataType::Null).len(pos.len()).build() diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index 257c216cf5f6..afbb46dbbc11 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -75,14 +75,16 @@ impl ParseJsonNumber for f64 { pub struct PrimitiveArrayDecoder { data_type: DataType, + ignore_type_conflicts: bool, // Invariant and Send phantom: PhantomData P>, } impl PrimitiveArrayDecoder

{ - pub fn new(data_type: DataType) -> Self { + pub fn new(data_type: DataType, ignore_type_conflicts: bool) -> Self { Self { data_type, + ignore_type_conflicts, phantom: Default::default(), } } @@ -98,6 +100,18 @@ where PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); let d = &self.data_type; + // Factor out this logic to simplify call sites below; the compiler will inline it, + // producing a highly predictable branch whose cost should be trivial compared to the + // expensive and unpredictably branchy string parse that immediately precedes each call. + let append = |builder: &mut PrimitiveBuilder

, value| { + match value { + Ok(value) => builder.append_value(value), + Err(_) if self.ignore_type_conflicts => builder.append_null(), + Err(e) => return Err(e), + }; + Ok(()) + }; + for p in pos { match tape.get(*p) { TapeElement::Null => builder.append_null(), @@ -105,38 +119,36 @@ where let s = tape.get_string(idx); let value = P::parse(s).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",)) - })?; - - builder.append_value(value) + }); + append(&mut builder, value)? } TapeElement::Number(idx) => { let s = tape.get_string(idx); let value = ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {s} as {d}",)) - })?; - - builder.append_value(value) + }); + append(&mut builder, value)? } TapeElement::F32(v) => { let v = f32::from_bits(v); let value = NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }); + append(&mut builder, value)? } TapeElement::I32(v) => { let value = NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }); + append(&mut builder, value)? } TapeElement::F64(high) => match tape.get(p + 1) { TapeElement::F32(low) => { let v = f64::from_bits(((high as u64) << 32) | low as u64); let value = NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }); + append(&mut builder, value)? } _ => unreachable!(), }, @@ -145,11 +157,12 @@ where let v = ((high as i64) << 32) | (low as u32) as i64; let value = NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }); + append(&mut builder, value)? } _ => unreachable!(), }, + _ if self.ignore_type_conflicts => builder.append_null(), _ => return Err(tape.error(*p, "primitive")), } } diff --git a/arrow-json/src/reader/string_array.rs b/arrow-json/src/reader/string_array.rs index 03d07ad8c8b3..476f7b2696c9 100644 --- a/arrow-json/src/reader/string_array.rs +++ b/arrow-json/src/reader/string_array.rs @@ -29,13 +29,15 @@ const FALSE: &str = "false"; pub struct StringArrayDecoder { coerce_primitive: bool, + ignore_type_conflicts: bool, phantom: PhantomData, } impl StringArrayDecoder { - pub fn new(coerce_primitive: bool) -> Self { + pub fn new(coerce_primitive: bool, ignore_type_conflicts: bool) -> Self { Self { coerce_primitive, + ignore_type_conflicts, phantom: Default::default(), } } @@ -70,6 +72,7 @@ impl ArrayDecoder for StringArrayDecoder { // An arbitrary estimate data_capacity += 10; } + _ if self.ignore_type_conflicts => {} _ => { return Err(tape.error(*p, "string")); } @@ -120,6 +123,7 @@ impl ArrayDecoder for StringArrayDecoder { } _ => unreachable!(), }, + _ if self.ignore_type_conflicts => builder.append_null(), _ => unreachable!(), } } diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs index 8aeb1c805899..cc47e3850b9a 100644 --- a/arrow-json/src/reader/string_view_array.rs +++ b/arrow-json/src/reader/string_view_array.rs @@ -30,11 +30,15 @@ const FALSE: &str = "false"; pub struct StringViewArrayDecoder { coerce_primitive: bool, + ignore_type_conflicts: bool, } impl StringViewArrayDecoder { - pub fn new(coerce_primitive: bool) -> Self { - Self { coerce_primitive } + pub fn new(coerce_primitive: bool, ignore_type_conflicts: bool) -> Self { + Self { + coerce_primitive, + ignore_type_conflicts, + } } } @@ -99,6 +103,7 @@ impl ArrayDecoder for StringViewArrayDecoder { TapeElement::F64(_) if coerce => { data_capacity += 10; } + _ if self.ignore_type_conflicts => {} // treat type conflicts like nulls _ => { return Err(tape.error(p, "string")); } @@ -155,6 +160,9 @@ impl ArrayDecoder for StringViewArrayDecoder { } _ => unreachable!(), }, + _ if self.ignore_type_conflicts => { + builder.append_null(); + } _ => unreachable!(), } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index b9408df77a43..c168ad699efa 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -26,6 +26,7 @@ pub struct StructArrayDecoder { data_type: DataType, decoders: Vec>, strict_mode: bool, + ignore_type_conflicts: bool, is_nullable: bool, struct_mode: StructMode, } @@ -35,6 +36,7 @@ impl StructArrayDecoder { data_type: DataType, coerce_primitive: bool, strict_mode: bool, + ignore_type_conflicts: bool, is_nullable: bool, struct_mode: StructMode, ) -> Result { @@ -49,6 +51,7 @@ impl StructArrayDecoder { f.data_type().clone(), coerce_primitive, strict_mode, + ignore_type_conflicts, nullable, struct_mode, ) @@ -59,6 +62,7 @@ impl StructArrayDecoder { data_type, decoders, strict_mode, + ignore_type_conflicts, is_nullable, struct_mode, }) @@ -89,6 +93,10 @@ impl ArrayDecoder for StructArrayDecoder { nulls.append(false); continue; } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + continue; + } (_, _) => return Err(tape.error(*p, "{")), }; @@ -129,6 +137,10 @@ impl ArrayDecoder for StructArrayDecoder { nulls.append(false); continue; } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + continue; + } (_, _) => return Err(tape.error(*p, "[")), }; diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index ee9018702920..b9298f9c23f4 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use chrono::TimeZone; +use chrono::{DateTime, TimeZone}; use std::marker::PhantomData; use arrow_array::builder::PrimitiveBuilder; @@ -32,15 +32,17 @@ use crate::reader::ArrayDecoder; pub struct TimestampArrayDecoder { data_type: DataType, timezone: Tz, + ignore_type_conflicts: bool, // Invariant and Send phantom: PhantomData P>, } impl TimestampArrayDecoder { - pub fn new(data_type: DataType, timezone: Tz) -> Self { + pub fn new(data_type: DataType, timezone: Tz, ignore_type_conflicts: bool) -> Self { Self { data_type, timezone, + ignore_type_conflicts, phantom: Default::default(), } } @@ -55,6 +57,18 @@ where let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); + // Factor out this logic to simplify call sites below; the compiler will inline it, + // producing a highly predictable branch whose cost should be trivial compared to the + // expensive and unpredictably branchy string parse that immediately precedes each call. + let append = |builder: &mut PrimitiveBuilder

, value| { + match value { + Ok(value) => builder.append_value(value), + Err(_) if self.ignore_type_conflicts => builder.append_null(), + Err(e) => return Err(e), + }; + Ok(()) + }; + for p in pos { match tape.get(*p) { TapeElement::Null => builder.append_null(), @@ -65,20 +79,20 @@ where "failed to parse \"{s}\" as {}: {}", self.data_type, e )) - })?; + }); - let value = match P::UNIT { - TimeUnit::Second => date.timestamp(), - TimeUnit::Millisecond => date.timestamp_millis(), - TimeUnit::Microsecond => date.timestamp_micros(), + let date_to_value = |date: DateTime| match P::UNIT { + TimeUnit::Second => Ok(date.timestamp()), + TimeUnit::Millisecond => Ok(date.timestamp_millis()), + TimeUnit::Microsecond => Ok(date.timestamp_micros()), TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| { ArrowError::ParseError(format!( "{} would overflow 64-bit signed nanoseconds", date.to_rfc3339(), )) - })?, + }), }; - builder.append_value(value) + append(&mut builder, date.and_then(date_to_value))? } TapeElement::Number(idx) => { let s = tape.get_string(idx); @@ -90,9 +104,8 @@ where "failed to parse {s} as {}", self.data_type )) - })?; - - builder.append_value(value) + }); + append(&mut builder, value)? } TapeElement::I32(v) => builder.append_value(v as i64), TapeElement::I64(high) => match tape.get(p + 1) { @@ -101,6 +114,7 @@ where } _ => unreachable!(), }, + _ if self.ignore_type_conflicts => builder.append_null(), _ => return Err(tape.error(*p, "primitive")), } }