Skip to content

Commit 340ecfd

Browse files
jayzhan211alamb
andauthored
Avoid concat for array_replace (#8337)
* add benchmark Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * address clippy Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * fix comment Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3b29837 commit 340ecfd

File tree

3 files changed

+135
-67
lines changed

3 files changed

+135
-67
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,7 @@ name = "sort"
167167
[[bench]]
168168
harness = false
169169
name = "topk_aggregate"
170+
171+
[[bench]]
172+
harness = false
173+
name = "array_expression"
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#[macro_use]
19+
extern crate criterion;
20+
extern crate arrow;
21+
extern crate datafusion;
22+
23+
mod data_utils;
24+
use crate::criterion::Criterion;
25+
use arrow_array::cast::AsArray;
26+
use arrow_array::types::Int64Type;
27+
use arrow_array::{ArrayRef, Int64Array, ListArray};
28+
use datafusion_physical_expr::array_expressions;
29+
use std::sync::Arc;
30+
31+
fn criterion_benchmark(c: &mut Criterion) {
32+
// Construct large arrays for benchmarking
33+
34+
let array_len = 100000000;
35+
36+
let array = (0..array_len).map(|_| Some(2_i64)).collect::<Vec<_>>();
37+
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
38+
Some(array.clone()),
39+
Some(array.clone()),
40+
Some(array),
41+
]);
42+
let from_array = Int64Array::from_value(2, 3);
43+
let to_array = Int64Array::from_value(-2, 3);
44+
45+
let args = vec![
46+
Arc::new(list_array) as ArrayRef,
47+
Arc::new(from_array) as ArrayRef,
48+
Arc::new(to_array) as ArrayRef,
49+
];
50+
51+
let array = (0..array_len).map(|_| Some(-2_i64)).collect::<Vec<_>>();
52+
let expected_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
53+
Some(array.clone()),
54+
Some(array.clone()),
55+
Some(array),
56+
]);
57+
58+
// Benchmark array functions
59+
60+
c.bench_function("array_replace", |b| {
61+
b.iter(|| {
62+
assert_eq!(
63+
array_expressions::array_replace_all(args.as_slice())
64+
.unwrap()
65+
.as_list::<i32>(),
66+
criterion::black_box(&expected_array)
67+
)
68+
})
69+
});
70+
}
71+
72+
criterion_group!(benches, criterion_benchmark);
73+
criterion_main!(benches);

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 58 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ use datafusion_common::cast::{
3535
};
3636
use datafusion_common::utils::{array_into_list_array, list_ndims};
3737
use datafusion_common::{
38-
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
39-
DataFusionError, Result,
38+
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result,
4039
};
4140

4241
use itertools::Itertools;
@@ -1320,84 +1319,76 @@ fn general_replace(
13201319
) -> Result<ArrayRef> {
13211320
// Build up the offsets for the final output array
13221321
let mut offsets: Vec<i32> = vec![0];
1323-
let data_type = list_array.value_type();
1324-
let mut new_values = vec![];
1322+
let values = list_array.values();
1323+
let original_data = values.to_data();
1324+
let to_data = to_array.to_data();
1325+
let capacity = Capacities::Array(original_data.len());
13251326

1326-
// n is the number of elements to replace in this row
1327-
for (row_index, (list_array_row, n)) in
1328-
list_array.iter().zip(arr_n.iter()).enumerate()
1329-
{
1330-
let last_offset: i32 = offsets
1331-
.last()
1332-
.copied()
1333-
.ok_or_else(|| internal_datafusion_err!("offsets should not be empty"))?;
1327+
// First array is the original array, second array is the element to replace with.
1328+
let mut mutable = MutableArrayData::with_capacities(
1329+
vec![&original_data, &to_data],
1330+
false,
1331+
capacity,
1332+
);
13341333

1335-
match list_array_row {
1336-
Some(list_array_row) => {
1337-
// Compute all positions in list_row_array (that is itself an
1338-
// array) that are equal to `from_array_row`
1339-
let eq_array = compare_element_to_list(
1340-
&list_array_row,
1341-
&from_array,
1342-
row_index,
1343-
true,
1344-
)?;
1334+
let mut valid = BooleanBufferBuilder::new(list_array.len());
13451335

1346-
// Use MutableArrayData to build the replaced array
1347-
let original_data = list_array_row.to_data();
1348-
let to_data = to_array.to_data();
1349-
let capacity = Capacities::Array(original_data.len() + to_data.len());
1336+
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
1337+
if list_array.is_null(row_index) {
1338+
offsets.push(offsets[row_index]);
1339+
valid.append(false);
1340+
continue;
1341+
}
13501342

1351-
// First array is the original array, second array is the element to replace with.
1352-
let mut mutable = MutableArrayData::with_capacities(
1353-
vec![&original_data, &to_data],
1354-
false,
1355-
capacity,
1356-
);
1357-
let original_idx = 0;
1358-
let replace_idx = 1;
1359-
1360-
let mut counter = 0;
1361-
for (i, to_replace) in eq_array.iter().enumerate() {
1362-
if let Some(true) = to_replace {
1363-
mutable.extend(replace_idx, row_index, row_index + 1);
1364-
counter += 1;
1365-
if counter == *n {
1366-
// copy original data for any matches past n
1367-
mutable.extend(original_idx, i + 1, eq_array.len());
1368-
break;
1369-
}
1370-
} else {
1371-
// copy original data for false / null matches
1372-
mutable.extend(original_idx, i, i + 1);
1373-
}
1374-
}
1343+
let start = offset_window[0] as usize;
1344+
let end = offset_window[1] as usize;
13751345

1376-
let data = mutable.freeze();
1377-
let replaced_array = arrow_array::make_array(data);
1346+
let list_array_row = list_array.value(row_index);
13781347

1379-
offsets.push(last_offset + replaced_array.len() as i32);
1380-
new_values.push(replaced_array);
1381-
}
1382-
None => {
1383-
// Null element results in a null row (no new offsets)
1384-
offsets.push(last_offset);
1348+
// Compute all positions in list_row_array (that is itself an
1349+
// array) that are equal to `from_array_row`
1350+
let eq_array =
1351+
compare_element_to_list(&list_array_row, &from_array, row_index, true)?;
1352+
1353+
let original_idx = 0;
1354+
let replace_idx = 1;
1355+
let n = arr_n[row_index];
1356+
let mut counter = 0;
1357+
1358+
// All elements are false, no need to replace, just copy original data
1359+
if eq_array.false_count() == eq_array.len() {
1360+
mutable.extend(original_idx, start, end);
1361+
offsets.push(offsets[row_index] + (end - start) as i32);
1362+
valid.append(true);
1363+
continue;
1364+
}
1365+
1366+
for (i, to_replace) in eq_array.iter().enumerate() {
1367+
if let Some(true) = to_replace {
1368+
mutable.extend(replace_idx, row_index, row_index + 1);
1369+
counter += 1;
1370+
if counter == n {
1371+
// copy original data for any matches past n
1372+
mutable.extend(original_idx, start + i + 1, end);
1373+
break;
1374+
}
1375+
} else {
1376+
// copy original data for false / null matches
1377+
mutable.extend(original_idx, start + i, start + i + 1);
13851378
}
13861379
}
1380+
1381+
offsets.push(offsets[row_index] + (end - start) as i32);
1382+
valid.append(true);
13871383
}
13881384

1389-
let values = if new_values.is_empty() {
1390-
new_empty_array(&data_type)
1391-
} else {
1392-
let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
1393-
arrow::compute::concat(&new_values)?
1394-
};
1385+
let data = mutable.freeze();
13951386

13961387
Ok(Arc::new(ListArray::try_new(
1397-
Arc::new(Field::new("item", data_type, true)),
1388+
Arc::new(Field::new("item", list_array.value_type(), true)),
13981389
OffsetBuffer::new(offsets.into()),
1399-
values,
1400-
list_array.nulls().cloned(),
1390+
arrow_array::make_array(data),
1391+
Some(NullBuffer::new(valid.finish())),
14011392
)?))
14021393
}
14031394

0 commit comments

Comments
 (0)