Skip to content

Commit 79da181

Browse files
jayzhan211appletreeisyellow
authored andcommitted
Remove define_array_slice and reuse array_slice for array_pop_front/back (apache#8401)
* array_element done Signed-off-by: jayzhan211 <[email protected]> * clippy Signed-off-by: jayzhan211 <[email protected]> * replace array_slice Signed-off-by: jayzhan211 <[email protected]> * fix get_indexed_field_empty_list Signed-off-by: jayzhan211 <[email protected]> * replace pop front and pop back Signed-off-by: jayzhan211 <[email protected]> * clippy Signed-off-by: jayzhan211 <[email protected]> * add doc and comment Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]>
1 parent ee84e15 commit 79da181

File tree

2 files changed

+179
-160
lines changed

2 files changed

+179
-160
lines changed

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 178 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Array expressions
1919
2020
use std::any::type_name;
21-
use std::cmp::Ordering;
2221
use std::collections::HashSet;
2322
use std::sync::Arc;
2423

@@ -370,135 +369,64 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
370369
}
371370
}
372371

373-
fn return_empty(return_null: bool, data_type: DataType) -> Arc<dyn Array> {
374-
if return_null {
375-
new_null_array(&data_type, 1)
376-
} else {
377-
new_empty_array(&data_type)
378-
}
379-
}
380-
381-
fn list_slice<T: Array + 'static>(
382-
array: &dyn Array,
383-
i: i64,
384-
j: i64,
385-
return_element: bool,
386-
) -> ArrayRef {
387-
let array = array.as_any().downcast_ref::<T>().unwrap();
388-
389-
let array_type = array.data_type().clone();
372+
/// array_element SQL function
373+
///
374+
/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
375+
/// `array_element(array, index)`
376+
///
377+
/// For example:
378+
/// > array_element(\[1, 2, 3], 2) -> 2
379+
pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
380+
let list_array = as_list_array(&args[0])?;
381+
let indexes = as_int64_array(&args[1])?;
390382

391-
if i == 0 && j == 0 || array.is_empty() {
392-
return return_empty(return_element, array_type);
393-
}
383+
let values = list_array.values();
384+
let original_data = values.to_data();
385+
let capacity = Capacities::Array(original_data.len());
394386

395-
let i = match i.cmp(&0) {
396-
Ordering::Less => {
397-
if i.unsigned_abs() > array.len() as u64 {
398-
return return_empty(true, array_type);
399-
}
387+
// use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
388+
let mut mutable =
389+
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
400390

401-
(array.len() as i64 + i + 1) as usize
402-
}
403-
Ordering::Equal => 1,
404-
Ordering::Greater => i as usize,
405-
};
391+
fn adjusted_array_index(index: i64, len: usize) -> Option<i64> {
392+
// 0 ~ len - 1
393+
let adjusted_zero_index = if index < 0 {
394+
index + len as i64
395+
} else {
396+
index - 1
397+
};
406398

407-
let j = match j.cmp(&0) {
408-
Ordering::Less => {
409-
if j.unsigned_abs() as usize > array.len() {
410-
return return_empty(true, array_type);
411-
}
412-
if return_element {
413-
(array.len() as i64 + j + 1) as usize
414-
} else {
415-
(array.len() as i64 + j) as usize
416-
}
399+
if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
400+
Some(adjusted_zero_index)
401+
} else {
402+
// Out of bounds
403+
None
417404
}
418-
Ordering::Equal => 1,
419-
Ordering::Greater => j.min(array.len() as i64) as usize,
420-
};
421-
422-
if i > j || i > array.len() {
423-
return_empty(return_element, array_type)
424-
} else {
425-
Arc::new(array.slice(i - 1, j + 1 - i))
426405
}
427-
}
428406

429-
fn slice<T: Array + 'static>(
430-
array: &ListArray,
431-
key: &Int64Array,
432-
extra_key: &Int64Array,
433-
return_element: bool,
434-
) -> Result<Arc<dyn Array>> {
435-
let sliced_array: Vec<Arc<dyn Array>> = array
436-
.iter()
437-
.zip(key.iter())
438-
.zip(extra_key.iter())
439-
.map(|((arr, i), j)| match (arr, i, j) {
440-
(Some(arr), Some(i), Some(j)) => list_slice::<T>(&arr, i, j, return_element),
441-
(Some(arr), None, Some(j)) => list_slice::<T>(&arr, 1i64, j, return_element),
442-
(Some(arr), Some(i), None) => {
443-
list_slice::<T>(&arr, i, arr.len() as i64, return_element)
444-
}
445-
(Some(arr), None, None) if !return_element => arr.clone(),
446-
_ => return_empty(return_element, array.value_type()),
447-
})
448-
.collect();
407+
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
408+
let start = offset_window[0] as usize;
409+
let end = offset_window[1] as usize;
410+
let len = end - start;
449411

450-
// concat requires input of at least one array
451-
if sliced_array.is_empty() {
452-
Ok(return_empty(return_element, array.value_type()))
453-
} else {
454-
let vec = sliced_array
455-
.iter()
456-
.map(|a| a.as_ref())
457-
.collect::<Vec<&dyn Array>>();
458-
let mut i: i32 = 0;
459-
let mut offsets = vec![i];
460-
offsets.extend(
461-
vec.iter()
462-
.map(|a| {
463-
i += a.len() as i32;
464-
i
465-
})
466-
.collect::<Vec<_>>(),
467-
);
468-
let values = compute::concat(vec.as_slice()).unwrap();
412+
// array is null
413+
if len == 0 {
414+
mutable.extend_nulls(1);
415+
continue;
416+
}
417+
418+
let index = adjusted_array_index(indexes.value(row_index), len);
469419

470-
if return_element {
471-
Ok(values)
420+
if let Some(index) = index {
421+
mutable.extend(0, start + index as usize, start + index as usize + 1);
472422
} else {
473-
let field = Arc::new(Field::new("item", array.value_type(), true));
474-
Ok(Arc::new(ListArray::try_new(
475-
field,
476-
OffsetBuffer::new(offsets.into()),
477-
values,
478-
None,
479-
)?))
423+
// Index out of bounds
424+
mutable.extend_nulls(1);
480425
}
481426
}
482-
}
483-
484-
fn define_array_slice(
485-
list_array: &ListArray,
486-
key: &Int64Array,
487-
extra_key: &Int64Array,
488-
return_element: bool,
489-
) -> Result<ArrayRef> {
490-
macro_rules! array_function {
491-
($ARRAY_TYPE:ident) => {
492-
slice::<$ARRAY_TYPE>(list_array, key, extra_key, return_element)
493-
};
494-
}
495-
call_array_function!(list_array.value_type(), true)
496-
}
497427

498-
pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
499-
let list_array = as_list_array(&args[0])?;
500-
let key = as_int64_array(&args[1])?;
501-
define_array_slice(list_array, key, key, true)
428+
let data = mutable.freeze();
429+
Ok(arrow_array::make_array(data))
502430
}
503431

504432
fn general_except<OffsetSize: OffsetSizeTrait>(
@@ -579,47 +507,136 @@ pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
579507
}
580508
}
581509

510+
/// array_slice SQL function
511+
///
512+
/// We follow the behavior of array_slice in DuckDB
513+
/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
514+
///
515+
/// > array_slice(array, from, to)
516+
///
517+
/// Positive index is treated as the index from the start of the array. If the
518+
/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
519+
/// length of the array, it is treated as the length of the array.
520+
///
521+
/// Negative index is treated as the index from the end of the array. If the index
522+
/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
523+
/// The `to` index is exclusive like python slice syntax.
524+
///
525+
/// See test cases in `array.slt` for more details.
582526
pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
583527
let list_array = as_list_array(&args[0])?;
584-
let key = as_int64_array(&args[1])?;
585-
let extra_key = as_int64_array(&args[2])?;
586-
define_array_slice(list_array, key, extra_key, false)
587-
}
588-
589-
fn general_array_pop(
590-
list_array: &GenericListArray<i32>,
591-
from_back: bool,
592-
) -> Result<(Vec<i64>, Vec<i64>)> {
593-
if from_back {
594-
let key = vec![0; list_array.len()];
595-
// Attention: `arr.len() - 1` in extra key defines the last element position (position = index + 1, not inclusive) we want in the new array.
596-
let extra_key: Vec<_> = list_array
597-
.iter()
598-
.map(|x| x.map_or(0, |arr| arr.len() as i64 - 1))
599-
.collect();
600-
Ok((key, extra_key))
601-
} else {
602-
// Attention: 2 in the `key`` defines the first element position (position = index + 1) we want in the new array.
603-
// We only handle two cases of the first element index: if the old array has any elements, starts from 2 (index + 1), or starts from initial.
604-
let key: Vec<_> = list_array.iter().map(|x| x.map_or(0, |_| 2)).collect();
605-
let extra_key: Vec<_> = list_array
606-
.iter()
607-
.map(|x| x.map_or(0, |arr| arr.len() as i64))
608-
.collect();
609-
Ok((key, extra_key))
528+
let from_array = as_int64_array(&args[1])?;
529+
let to_array = as_int64_array(&args[2])?;
530+
531+
let values = list_array.values();
532+
let original_data = values.to_data();
533+
let capacity = Capacities::Array(original_data.len());
534+
535+
// use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls.
536+
let mut mutable =
537+
MutableArrayData::with_capacities(vec![&original_data], false, capacity);
538+
539+
// We have the slice syntax compatible with DuckDB v0.8.1.
540+
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
541+
542+
fn adjusted_from_index(index: i64, len: usize) -> Option<i64> {
543+
// 0 ~ len - 1
544+
let adjusted_zero_index = if index < 0 {
545+
index + len as i64
546+
} else {
547+
// array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
548+
std::cmp::max(index - 1, 0)
549+
};
550+
551+
if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
552+
Some(adjusted_zero_index)
553+
} else {
554+
// Out of bounds
555+
None
556+
}
557+
}
558+
559+
fn adjusted_to_index(index: i64, len: usize) -> Option<i64> {
560+
// 0 ~ len - 1
561+
let adjusted_zero_index = if index < 0 {
562+
// array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
563+
index + len as i64 - 1
564+
} else {
565+
// array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
566+
std::cmp::min(index - 1, len as i64 - 1)
567+
};
568+
569+
if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
570+
Some(adjusted_zero_index)
571+
} else {
572+
// Out of bounds
573+
None
574+
}
575+
}
576+
577+
let mut offsets = vec![0];
578+
579+
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
580+
let start = offset_window[0] as usize;
581+
let end = offset_window[1] as usize;
582+
let len = end - start;
583+
584+
// len 0 indicate array is null, return empty array in this row.
585+
if len == 0 {
586+
offsets.push(offsets[row_index]);
587+
continue;
588+
}
589+
590+
// If index is null, we consider it as the minimum / maximum index of the array.
591+
let from_index = if from_array.is_null(row_index) {
592+
Some(0)
593+
} else {
594+
adjusted_from_index(from_array.value(row_index), len)
595+
};
596+
597+
let to_index = if to_array.is_null(row_index) {
598+
Some(len as i64 - 1)
599+
} else {
600+
adjusted_to_index(to_array.value(row_index), len)
601+
};
602+
603+
if let (Some(from), Some(to)) = (from_index, to_index) {
604+
if from <= to {
605+
assert!(start + to as usize <= end);
606+
mutable.extend(0, start + from as usize, start + to as usize + 1);
607+
offsets.push(offsets[row_index] + (to - from + 1) as i32);
608+
} else {
609+
// invalid range, return empty array
610+
offsets.push(offsets[row_index]);
611+
}
612+
} else {
613+
// invalid range, return empty array
614+
offsets.push(offsets[row_index]);
615+
}
610616
}
617+
618+
let data = mutable.freeze();
619+
620+
Ok(Arc::new(ListArray::try_new(
621+
Arc::new(Field::new("item", list_array.value_type(), true)),
622+
OffsetBuffer::new(offsets.into()),
623+
arrow_array::make_array(data),
624+
None,
625+
)?))
611626
}
612627

628+
/// array_pop_back SQL function
613629
pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
614630
let list_array = as_list_array(&args[0])?;
615-
let (key, extra_key) = general_array_pop(list_array, true)?;
616-
617-
define_array_slice(
618-
list_array,
619-
&Int64Array::from(key),
620-
&Int64Array::from(extra_key),
621-
false,
622-
)
631+
let from_array = Int64Array::from(vec![1; list_array.len()]);
632+
let to_array = Int64Array::from(
633+
list_array
634+
.iter()
635+
.map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
636+
.collect::<Vec<i64>>(),
637+
);
638+
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
639+
array_slice(args.as_slice())
623640
}
624641

625642
/// Appends or prepends elements to a ListArray.
@@ -743,16 +760,18 @@ pub fn gen_range(args: &[ArrayRef]) -> Result<ArrayRef> {
743760
Ok(arr)
744761
}
745762

763+
/// array_pop_front SQL function
746764
pub fn array_pop_front(args: &[ArrayRef]) -> Result<ArrayRef> {
747765
let list_array = as_list_array(&args[0])?;
748-
let (key, extra_key) = general_array_pop(list_array, false)?;
749-
750-
define_array_slice(
751-
list_array,
752-
&Int64Array::from(key),
753-
&Int64Array::from(extra_key),
754-
false,
755-
)
766+
let from_array = Int64Array::from(vec![2; list_array.len()]);
767+
let to_array = Int64Array::from(
768+
list_array
769+
.iter()
770+
.map(|arr| arr.map_or(0, |arr| arr.len() as i64))
771+
.collect::<Vec<i64>>(),
772+
);
773+
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
774+
array_slice(args.as_slice())
756775
}
757776

758777
/// Array_append SQL function

datafusion/physical-expr/src/expressions/get_indexed_field.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ mod tests {
453453
.evaluate(&batch)?
454454
.into_array(batch.num_rows())
455455
.expect("Failed to convert to array");
456-
assert!(result.is_null(0));
456+
assert!(result.is_empty());
457457
Ok(())
458458
}
459459

0 commit comments

Comments
 (0)