Skip to content

Commit bdba08a

Browse files
committed
feat: support LargeList in make_array and
array_length
1 parent f390f15 commit bdba08a

File tree

5 files changed

+125
-43
lines changed

5 files changed

+125
-43
lines changed

datafusion/common/src/scalar.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,7 +1725,11 @@ impl ScalarValue {
17251725
} else {
17261726
Self::iter_to_array(values.iter().cloned()).unwrap()
17271727
};
1728-
Arc::new(array_into_list_array(values))
1728+
if values.len() <= i32::MAX as usize {
1729+
Arc::new(array_into_list_array::<i32>(values))
1730+
} else {
1731+
Arc::new(array_into_list_array::<i64>(values))
1732+
}
17291733
}
17301734

17311735
/// Converts a scalar value into an array of `size` rows.
@@ -2127,7 +2131,7 @@ impl ScalarValue {
21272131
let list_array = as_list_array(array);
21282132
let nested_array = list_array.value(index);
21292133
// Produces a single element `ListArray` with the value at `index`.
2130-
let arr = Arc::new(array_into_list_array(nested_array));
2134+
let arr = Arc::new(array_into_list_array::<i32>(nested_array));
21312135

21322136
ScalarValue::List(arr)
21332137
}
@@ -2136,7 +2140,7 @@ impl ScalarValue {
21362140
let list_array = as_fixed_size_list_array(array)?;
21372141
let nested_array = list_array.value(index);
21382142
// Produces a single element `ListArray` with the value at `index`.
2139-
let arr = Arc::new(array_into_list_array(nested_array));
2143+
let arr = Arc::new(array_into_list_array::<i32>(nested_array));
21402144

21412145
ScalarValue::List(arr)
21422146
}
@@ -3176,10 +3180,12 @@ mod tests {
31763180

31773181
#[test]
31783182
fn iter_to_array_string_test() {
3179-
let arr1 =
3180-
array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
3181-
let arr2 =
3182-
array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));
3183+
let arr1 = array_into_list_array::<i32>(Arc::new(StringArray::from(vec![
3184+
"foo", "bar", "baz",
3185+
])));
3186+
let arr2 = array_into_list_array::<i32>(Arc::new(StringArray::from(vec![
3187+
"rust", "world",
3188+
])));
31833189

31843190
let scalars = vec![
31853191
ScalarValue::List(Arc::new(arr1)),
@@ -4436,13 +4442,13 @@ mod tests {
44364442
// Define list-of-structs scalars
44374443

44384444
let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap();
4439-
let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array)));
4445+
let nl0 = ScalarValue::List(Arc::new(array_into_list_array::<i32>(nl0_array)));
44404446

44414447
let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
4442-
let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array)));
4448+
let nl1 = ScalarValue::List(Arc::new(array_into_list_array::<i32>(nl1_array)));
44434449

44444450
let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
4445-
let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array)));
4451+
let nl2 = ScalarValue::List(Arc::new(array_into_list_array::<i32>(nl2_array)));
44464452

44474453
// iter_to_array for list-of-struct
44484454
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();

datafusion/common/src/utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::compute;
2525
use arrow::compute::{partition, SortColumn, SortOptions};
2626
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
2727
use arrow::record_batch::RecordBatch;
28-
use arrow_array::{Array, ListArray};
28+
use arrow_array::{Array, GenericListArray, ListArray, OffsetSizeTrait};
2929
use sqlparser::ast::Ident;
3030
use sqlparser::dialect::GenericDialect;
3131
use sqlparser::parser::Parser;
@@ -339,9 +339,9 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
339339

340340
/// Wrap an array into a single element `ListArray`.
341341
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
342-
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
342+
pub fn array_into_list_array<O: OffsetSizeTrait>(arr: ArrayRef) -> GenericListArray<O> {
343343
let offsets = OffsetBuffer::from_lengths([arr.len()]);
344-
ListArray::new(
344+
GenericListArray::<O>::new(
345345
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
346346
offsets,
347347
arr,

datafusion/physical-expr/src/aggregate/array_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ impl Accumulator for ArrayAggAccumulator {
169169
}
170170

171171
let concated_array = arrow::compute::concat(&element_arrays)?;
172-
let list_array = array_into_list_array(concated_array);
172+
let list_array = array_into_list_array::<i32>(concated_array);
173173

174174
Ok(ScalarValue::List(Arc::new(list_array)))
175175
}

datafusion/physical-expr/src/aggregate/array_agg_distinct.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ mod tests {
206206
};
207207

208208
let arr = arrow::compute::sort(&arr, None).unwrap();
209-
let list_arr = array_into_list_array(arr);
209+
let list_arr = array_into_list_array::<i32>(arr);
210210
ScalarValue::List(Arc::new(list_arr))
211211
}
212212

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 104 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use arrow_buffer::NullBuffer;
2929

3030
use arrow_schema::FieldRef;
3131
use datafusion_common::cast::{
32-
as_generic_string_array, as_int64_array, as_list_array, as_string_array,
32+
as_generic_list_array, as_generic_string_array, as_int64_array, as_list_array,
33+
as_string_array,
3334
};
3435
use datafusion_common::utils::array_into_list_array;
3536
use datafusion_common::{
@@ -82,19 +83,22 @@ macro_rules! new_builder {
8283
}};
8384
}
8485

85-
/// Combines multiple arrays into a single ListArray
86+
/// Combines multiple arrays into a single ListArray or LargeListArray
8687
///
8788
/// $ARGS: slice of arrays, each with $ARRAY_TYPE
8889
/// $ARRAY_TYPE: the type of the list elements
8990
/// $BUILDER_TYPE: the type of ArrayBuilder for the list elements
91+
/// $OFFSIZE: the type of OffsetSizeTrait for the list elements
9092
///
9193
/// Returns: a ListArray where the elements each have the same type as
9294
/// $ARRAY_TYPE and each element have a length of $ARGS.len()
9395
macro_rules! array {
94-
($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{
96+
($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident, $OFFSIZE: ident) => {{
9597
let builder = new_builder!($BUILDER_TYPE, $ARGS[0].len());
96-
let mut builder =
97-
ListBuilder::<$BUILDER_TYPE>::with_capacity(builder, $ARGS.len());
98+
let mut builder = GenericListBuilder::<$OFFSIZE, $BUILDER_TYPE>::with_capacity(
99+
builder,
100+
$ARGS.len(),
101+
);
98102

99103
let num_rows = $ARGS[0].len();
100104
assert!(
@@ -311,13 +315,16 @@ macro_rules! call_array_function {
311315
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
312316
/// col1 col2 output
313317
/// ```
314-
fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> {
318+
fn array_array<O: OffsetSizeTrait>(
319+
args: &[ArrayRef],
320+
data_type: DataType,
321+
) -> Result<ArrayRef> {
315322
// do not accept 0 arguments.
316323
if args.is_empty() {
317324
return plan_err!("Array requires at least one argument");
318325
}
319326

320-
let res = match data_type {
327+
let res: Arc<GenericListArray<O>> = match data_type {
321328
DataType::List(..) => {
322329
let row_count = args[0].len();
323330
let column_count = args.len();
@@ -372,27 +379,27 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> {
372379
.map(|x| x as &dyn Array)
373380
.collect::<Vec<_>>();
374381
let values = compute::concat(elements.as_slice())?;
375-
let list_arr = ListArray::new(
382+
let list_arr = GenericListArray::<O>::new(
376383
field,
377384
OffsetBuffer::from_lengths(list_array_lengths),
378385
values,
379386
Some(NullBuffer::new(buffer)),
380387
);
381388
Arc::new(list_arr)
382389
}
383-
DataType::Utf8 => array!(args, StringArray, StringBuilder),
384-
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder),
385-
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder),
386-
DataType::Float32 => array!(args, Float32Array, Float32Builder),
387-
DataType::Float64 => array!(args, Float64Array, Float64Builder),
388-
DataType::Int8 => array!(args, Int8Array, Int8Builder),
389-
DataType::Int16 => array!(args, Int16Array, Int16Builder),
390-
DataType::Int32 => array!(args, Int32Array, Int32Builder),
391-
DataType::Int64 => array!(args, Int64Array, Int64Builder),
392-
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder),
393-
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder),
394-
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder),
395-
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder),
390+
DataType::Utf8 => array!(args, StringArray, StringBuilder, O),
391+
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder, O),
392+
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder, O),
393+
DataType::Float32 => array!(args, Float32Array, Float32Builder, O),
394+
DataType::Float64 => array!(args, Float64Array, Float64Builder, O),
395+
DataType::Int8 => array!(args, Int8Array, Int8Builder, O),
396+
DataType::Int16 => array!(args, Int16Array, Int16Builder, O),
397+
DataType::Int32 => array!(args, Int32Array, Int32Builder, O),
398+
DataType::Int64 => array!(args, Int64Array, Int64Builder, O),
399+
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder, O),
400+
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder, O),
401+
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder, O),
402+
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder, O),
396403
data_type => {
397404
return not_impl_err!("Array is not implemented for type '{data_type:?}'.")
398405
}
@@ -412,13 +419,28 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
412419
}
413420
}
414421

422+
let len = arrays.len();
415423
match data_type {
416424
// Either an empty array or all nulls:
417425
DataType::Null => {
418426
let array = new_null_array(&DataType::Null, arrays.len());
419-
Ok(Arc::new(array_into_list_array(array)))
427+
if len <= i32::MAX as usize {
428+
Ok(Arc::new(array_into_list_array::<i32>(array)))
429+
} else if len <= i64::MAX as usize {
430+
Ok(Arc::new(array_into_list_array::<i64>(array)))
431+
} else {
432+
exec_err!("The number of elements {} in the array exceed the maximum number of elements supported by DataFusion",len)
433+
}
434+
}
435+
data_type => {
436+
if len <= i32::MAX as usize {
437+
array_array::<i32>(arrays, data_type)
438+
} else if len <= i64::MAX as usize {
439+
array_array::<i64>(arrays, data_type)
440+
} else {
441+
exec_err!("The number of elements {} in the array exceed the maximum number of elements supported by DataFusion",len)
442+
}
420443
}
421-
data_type => array_array(arrays, data_type),
422444
}
423445
}
424446

@@ -1688,7 +1710,20 @@ pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
16881710

16891711
/// Array_length SQL function
16901712
pub fn array_length(args: &[ArrayRef]) -> Result<ArrayRef> {
1691-
let list_array = as_list_array(&args[0])?;
1713+
match &args[0].data_type() {
1714+
DataType::List(_) => _array_length_list::<i32>(args),
1715+
DataType::LargeList(_) => _array_length_list::<i64>(args),
1716+
_ => Err(DataFusionError::Internal(format!(
1717+
"array_length does not support type '{:?}'",
1718+
args[0].data_type()
1719+
))),
1720+
}
1721+
}
1722+
1723+
/// array_length for List and LargeList
1724+
fn _array_length_list<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
1725+
let list_array = as_generic_list_array::<O>(&args[0])?;
1726+
16921727
let dimension = if args.len() == 2 {
16931728
as_int64_array(&args[1])?.clone()
16941729
} else {
@@ -2048,8 +2083,10 @@ mod tests {
20482083
Some(vec![Some(6), Some(7), Some(8)]),
20492084
]));
20502085

2051-
let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef;
2052-
let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef;
2086+
let array2d_1 =
2087+
Arc::new(array_into_list_array::<i32>(array1d_1.clone())) as ArrayRef;
2088+
let array2d_2 =
2089+
Arc::new(array_into_list_array::<i32>(array1d_2.clone())) as ArrayRef;
20532090

20542091
let res =
20552092
align_array_dimensions(vec![array1d_1.to_owned(), array2d_2.to_owned()])
@@ -2063,8 +2100,8 @@ mod tests {
20632100
expected_dim
20642101
);
20652102

2066-
let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef;
2067-
let array3d_2 = array_into_list_array(array2d_2.to_owned());
2103+
let array3d_1 = Arc::new(array_into_list_array::<i32>(array2d_1)) as ArrayRef;
2104+
let array3d_2 = array_into_list_array::<i32>(array2d_2.to_owned());
20682105
let res =
20692106
align_array_dimensions(vec![array1d_1, Arc::new(array3d_2.clone())]).unwrap();
20702107

@@ -2121,6 +2158,12 @@ mod tests {
21212158
);
21222159
}
21232160

2161+
#[test]
2162+
fn test_return_large_array() {
2163+
let list_array = return_large_array();
2164+
assert_eq!(list_array.data_type().to_string(), "failed to cast to");
2165+
}
2166+
21242167
#[test]
21252168
fn test_array_element() {
21262169
// array_element([1, 2, 3, 4], 1) = 1
@@ -2867,6 +2910,8 @@ mod tests {
28672910
fn test_array_length() {
28682911
// array_length([1, 2, 3, 4]) = 4
28692912
let list_array = return_array();
2913+
// cast thr list_array to LargeList
2914+
println!("{:?}", list_array.data_type());
28702915
let arr = array_length(&[list_array.clone()])
28712916
.expect("failed to initialize function array_ndims");
28722917
let result =
@@ -2875,6 +2920,16 @@ mod tests {
28752920
assert_eq!(result, &UInt64Array::from_value(4, 1));
28762921

28772922
// array_length([1, 2, 3, 4], 1) = 4
2923+
let array =
2924+
array_length(&[list_array.clone(), Arc::new(Int64Array::from_value(1, 1))])
2925+
.expect("failed to initialize function array_ndims");
2926+
let result =
2927+
as_uint64_array(&array).expect("failed to initialize function array_ndims");
2928+
2929+
assert_eq!(result, &UInt64Array::from_value(4, 1));
2930+
2931+
// for LargeList
2932+
// array_length([1, 2, 3, 4], 2) = 4
28782933
let array = array_length(&[list_array, Arc::new(Int64Array::from_value(1, 1))])
28792934
.expect("failed to initialize function array_ndims");
28802935
let result =
@@ -3015,6 +3070,27 @@ mod tests {
30153070
make_array(&args).expect("failed to initialize function array")
30163071
}
30173072

3073+
fn return_large_array() -> ArrayRef {
3074+
// Returns: [1, 2, 3, 4]
3075+
let capacity = i32::MAX as usize + 10;
3076+
let args = vec![Arc::new(Int64Array::from(vec![Some(1)])) as ArrayRef; capacity];
3077+
3078+
println!("args.len() = {}", args.len());
3079+
3080+
make_array(&args).expect("failed to initialize function array")
3081+
}
3082+
3083+
fn return_extra_array() -> ArrayRef {
3084+
// Returns: [11, 12, 13, 14]
3085+
let args = [
3086+
Arc::new(Int64Array::from(vec![Some(11)])) as ArrayRef,
3087+
Arc::new(Int64Array::from(vec![Some(12)])) as ArrayRef,
3088+
Arc::new(Int64Array::from(vec![Some(13)])) as ArrayRef,
3089+
Arc::new(Int64Array::from(vec![Some(14)])) as ArrayRef,
3090+
];
3091+
make_array(&args).expect("failed to initialize function array")
3092+
}
3093+
30183094
fn return_nested_array() -> ArrayRef {
30193095
// Returns: [[1, 2, 3, 4], [5, 6, 7, 8]]
30203096
let args = [

0 commit comments

Comments
 (0)