Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,12 +1536,10 @@ mod test {
.unwrap()
.resolve(&schema)
.unwrap();
let r4 = apache_avro::to_value(serde_json::json!({
"col1": null
}))
.unwrap()
.resolve(&schema)
.unwrap();
let r4 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();

let mut w = apache_avro::Writer::new(&schema, vec![]);
w.append(r1).unwrap();
Expand Down Expand Up @@ -1600,12 +1598,10 @@ mod test {
}"#,
)
.unwrap();
let r1 = apache_avro::to_value(serde_json::json!({
"col1": null
}))
.unwrap()
.resolve(&schema)
.unwrap();
let r1 = apache_avro::to_value(serde_json::json!({ "col1": null }))
.unwrap()
.resolve(&schema)
.unwrap();
let r2 = apache_avro::to_value(serde_json::json!({
"col1": {
"col2": "hello"
Expand Down
190 changes: 78 additions & 112 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,58 +577,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}

macro_rules! append {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let mut offsets: Vec<i32> = vec![0];
let mut values =
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();

let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
DataFusionError::Internal(format!("offsets should not be empty"))
})?;
match arr {
Some(arr) => {
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
values = downcast_arg!(
compute::concat(&[
&values,
child_array,
&$ARRAY_TYPE::from(vec![el])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + child_array.len() as i32 + 1i32);
}
None => {
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el.clone()])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + 1i32);
}
}
}

let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));

Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)?)
}};
}

/// Array_append SQL function
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = as_list_array(&args[0])?;
Expand All @@ -639,68 +587,51 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
append!(arr, element, $ARRAY_TYPE)
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(0, 0, original_data.len());
mutable.extend(1, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}
call_array_function!(data_type, false)
}
};

Ok(res)
}

macro_rules! prepend {
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
let mut offsets: Vec<i32> = vec![0];
let mut values =
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();
let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
DataFusionError::Internal(format!("offsets should not be empty"))
})?;
match arr {
Some(arr) => {
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el]),
child_array
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + child_array.len() as i32 + 1i32);
}
None => {
values = downcast_arg!(
compute::concat(&[
&values,
&$ARRAY_TYPE::from(vec![el.clone()])
])?
.clone(),
$ARRAY_TYPE
)
.clone();
offsets.push(last_offset + 1i32);
}
}
Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
}
};

let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));

Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)?)
}};
Ok(res)
}

/// Array_prepend SQL function
Expand All @@ -713,12 +644,47 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
data_type => {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
prepend!(arr, element, $ARRAY_TYPE)
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(1, row_index, row_index + 1);
mutable.extend(0, 0, original_data.len());
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}
call_array_function!(data_type, false)

let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
}
};

Expand Down