Skip to content

Commit 98afb20

Browse files
committed
rebase
Signed-off-by: jayzhan211 <[email protected]>
1 parent 1a40084 commit 98afb20

File tree

3 files changed

+82
-87
lines changed

3 files changed

+82
-87
lines changed

datafusion/common/src/scalar.rs

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use arrow::{
5252
},
5353
};
5454
use arrow_array::cast::as_list_array;
55-
use arrow_array::types::ArrowTimestampType;
5655
use arrow_array::{ArrowNativeTypeOp, Scalar};
5756
use arrow_buffer::{Buffer, NullBuffer};
5857

@@ -376,31 +375,7 @@ impl PartialOrd for ScalarValue {
376375
}
377376
(List(_), _) | (LargeList(_), _) | (FixedSizeList(_), _) => None,
378377
(Struct(struct_arr1), Struct(struct_arr2)) => {
379-
if struct_arr1.len() != struct_arr2.len() {
380-
return None;
381-
}
382-
383-
if struct_arr1.data_type() != struct_arr2.data_type() {
384-
return None;
385-
}
386-
387-
for col_index in 0..struct_arr1.num_columns() {
388-
let arr1 = struct_arr1.column(col_index);
389-
let arr2 = struct_arr2.column(col_index);
390-
391-
let lt_res = arrow::compute::kernels::cmp::lt(arr1, arr2).ok()?;
392-
let eq_res = arrow::compute::kernels::cmp::eq(arr1, arr2).ok()?;
393-
394-
for j in 0..lt_res.len() {
395-
if lt_res.is_valid(j) && lt_res.value(j) {
396-
return Some(Ordering::Less);
397-
}
398-
if eq_res.is_valid(j) && !eq_res.value(j) {
399-
return Some(Ordering::Greater);
400-
}
401-
}
402-
}
403-
Some(Ordering::Equal)
378+
partial_cmp_struct(struct_arr1, struct_arr2)
404379
}
405380
(Struct(_), _) => None,
406381
(Date32(v1), Date32(v2)) => v1.partial_cmp(v2),
@@ -496,6 +471,34 @@ fn partial_cmp_list(arr1: &dyn Array, arr2: &dyn Array) -> Option<Ordering> {
496471
Some(Ordering::Equal)
497472
}
498473

474+
fn partial_cmp_struct(s1: &Arc<StructArray>, s2: &Arc<StructArray>) -> Option<Ordering> {
475+
if s1.len() != s2.len() {
476+
return None;
477+
}
478+
479+
if s1.data_type() != s2.data_type() {
480+
return None;
481+
}
482+
483+
for col_index in 0..s1.num_columns() {
484+
let arr1 = s1.column(col_index);
485+
let arr2 = s2.column(col_index);
486+
487+
let lt_res = arrow::compute::kernels::cmp::lt(arr1, arr2).ok()?;
488+
let eq_res = arrow::compute::kernels::cmp::eq(arr1, arr2).ok()?;
489+
490+
for j in 0..lt_res.len() {
491+
if lt_res.is_valid(j) && lt_res.value(j) {
492+
return Some(Ordering::Less);
493+
}
494+
if eq_res.is_valid(j) && !eq_res.value(j) {
495+
return Some(Ordering::Greater);
496+
}
497+
}
498+
}
499+
Some(Ordering::Equal)
500+
}
501+
499502
impl Eq for ScalarValue {}
500503

501504
//Float wrapper over f32/f64. Just because we cannot build std::hash::Hash for floats directly we have to do it through type wrapper
@@ -3324,21 +3327,13 @@ mod tests {
33243327
use crate::cast::{as_string_array, as_uint32_array, as_uint64_array};
33253328

33263329
use arrow::buffer::OffsetBuffer;
3327-
use arrow::compute::{concat, is_null, kernels};
3330+
use arrow::compute::{is_null, kernels};
33283331
use arrow::datatypes::{ArrowNumericType, ArrowPrimitiveType};
33293332
use arrow::util::pretty::pretty_format_columns;
33303333
use arrow_buffer::Buffer;
33313334
use chrono::NaiveDate;
33323335
use rand::Rng;
33333336

3334-
use arrow::compute::kernels;
3335-
use arrow::datatypes::ArrowPrimitiveType;
3336-
use arrow::{buffer::OffsetBuffer, compute::is_null};
3337-
use arrow_array::ArrowNumericType;
3338-
3339-
use chrono::NaiveDate;
3340-
use rand::Rng;
3341-
33423337
#[test]
33433338
fn test_scalar_value_from_for_struct() {
33443339
let boolean = Arc::new(BooleanArray::from(vec![false]));

datafusion/physical-expr/src/aggregate/array_agg_ordered.rs

Lines changed: 15 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,9 @@ use crate::{
3333
use arrow::array::{Array, ArrayRef};
3434
use arrow::datatypes::{DataType, Field};
3535
use arrow_array::cast::AsArray;
36-
use arrow_array::Array;
3736
use arrow_array::{new_empty_array, StructArray};
3837
use arrow_schema::{Fields, SortOptions};
3938

40-
use datafusion_common::cast::as_list_array;
41-
use datafusion_common::internal_err;
4239
use datafusion_common::utils::array_into_list_array;
4340
use datafusion_common::utils::{compare_rows, get_row_at_idx};
4441
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
@@ -245,22 +242,17 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
245242
let mut partition_ordering_values = vec![];
246243

247244
// Existing values should be merged also.
248-
partition_values.push(self.values.clone());
249-
partition_ordering_values.push(self.ordering_values.clone());
245+
partition_values.push(self.values.clone().into());
246+
partition_ordering_values.push(self.ordering_values.clone().into());
250247

251248
// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
252249
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
253-
partition_values.extend(array_agg_res);
250+
for v in array_agg_res.into_iter() {
251+
partition_values.push(v.into());
252+
}
254253

255254
let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
256255

257-
for partition_ordering_rows in orderings.into_iter() {
258-
// Extract value from struct to ordering_rows for each group/partition
259-
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
260-
if let ScalarValue::Struct(s) = ordering_row {
261-
let mut ordering_columns_per_row = vec![];
262-
263-
let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
264256
for partition_ordering_rows in orderings.into_iter() {
265257
// Extract value from struct to ordering_rows for each group/partition
266258
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
@@ -274,13 +266,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
274266

275267
Ok(ordering_columns_per_row)
276268
} else {
277-
internal_err!(
269+
exec_err!(
278270
"Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
279271
ordering_row.data_type()
280272
)
281273
}
282-
}).collect::<Result<Vec<_>>>()?;
283-
// }).collect::<Result<Vec<_>>>()?;
274+
}).collect::<Result<VecDeque<_>>>()?;
275+
284276
partition_ordering_values.push(ordering_value);
285277
}
286278

@@ -289,13 +281,12 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
289281
.iter()
290282
.map(|sort_expr| sort_expr.options)
291283
.collect::<Vec<_>>();
292-
let (new_values, new_orderings) = merge_ordered_arrays(
293-
&partition_values,
294-
&partition_ordering_values,
284+
285+
(self.values, self.ordering_values) = merge_ordered_arrays(
286+
&mut partition_values,
287+
&mut partition_ordering_values,
295288
&sort_options,
296289
)?;
297-
self.values = new_values;
298-
self.ordering_values = new_orderings;
299290

300291
Ok(())
301292
}
@@ -314,10 +305,6 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
314305
ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0])
315306
};
316307
Ok(ScalarValue::List(array))
317-
// Ok(ScalarValue::List(ScalarValue::new_list(
318-
// &self.values,
319-
// &self.datatypes[0],
320-
// )))
321308
}
322309

323310
fn size(&self) -> usize {
@@ -348,18 +335,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
348335
impl OrderSensitiveArrayAggAccumulator {
349336
fn evaluate_orderings(&self) -> Result<ScalarValue> {
350337
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
351-
let struct_field = Fields::from(fields);
352-
353-
// let orderings: Vec<ScalarValue> = self
354-
// .ordering_values
355-
// .iter()
356-
// .map(|ordering| {
357-
// ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
358-
// })
359-
// .collect();
360-
// let struct_type = DataType::Struct(struct_field);
361-
let mut column_wise_ordering_values = vec![];
362338
let num_columns = fields.len();
339+
let struct_field = Fields::from(fields.clone());
340+
341+
let mut column_wise_ordering_values = vec![];
363342
for i in 0..num_columns {
364343
let column_values = self
365344
.ordering_values

datafusion/physical-expr/src/aggregate/nth_value.rs

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ use crate::{
3030
};
3131

3232
use arrow_array::cast::AsArray;
33-
use arrow_array::ArrayRef;
33+
use arrow_array::{new_empty_array, ArrayRef, StructArray};
3434
use arrow_schema::{DataType, Field, Fields};
35-
use datafusion_common::utils::get_row_at_idx;
35+
use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
3636
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
3737
use datafusion_expr::Accumulator;
3838

@@ -270,7 +270,14 @@ impl Accumulator for NthValueAccumulator {
270270
let ordering_values = orderings.into_iter().map(|partition_ordering_rows| {
271271
// Extract value from struct to ordering_rows for each group/partition
272272
partition_ordering_rows.into_iter().map(|ordering_row| {
273-
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
273+
if let ScalarValue::Struct(s) = ordering_row {
274+
let mut ordering_columns_per_row = vec![];
275+
276+
for column in s.columns() {
277+
let sv = ScalarValue::try_from_array(column, 0)?;
278+
ordering_columns_per_row.push(sv);
279+
}
280+
274281
Ok(ordering_columns_per_row)
275282
} else {
276283
exec_err!(
@@ -305,7 +312,7 @@ impl Accumulator for NthValueAccumulator {
305312
fn state(&mut self) -> Result<Vec<ScalarValue>> {
306313
let mut result = vec![self.evaluate_values()];
307314
if !self.ordering_req.is_empty() {
308-
result.push(self.evaluate_orderings());
315+
result.push(self.evaluate_orderings()?);
309316
}
310317
Ok(result)
311318
}
@@ -354,21 +361,35 @@ impl Accumulator for NthValueAccumulator {
354361
}
355362

356363
impl NthValueAccumulator {
357-
fn evaluate_orderings(&self) -> ScalarValue {
364+
fn evaluate_orderings(&self) -> Result<ScalarValue> {
358365
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
359-
let struct_field = Fields::from(fields);
366+
let struct_field = Fields::from(fields.clone());
360367

361-
let orderings = self
362-
.ordering_values
363-
.iter()
364-
.map(|ordering| {
365-
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
366-
})
367-
.collect::<Vec<_>>();
368-
let struct_type = DataType::Struct(struct_field);
368+
let mut column_wise_ordering_values = vec![];
369+
let num_columns = fields.len();
370+
for i in 0..num_columns {
371+
let column_values = self
372+
.ordering_values
373+
.iter()
374+
.map(|x| x[i].clone())
375+
.collect::<Vec<_>>();
376+
let array = if column_values.is_empty() {
377+
new_empty_array(fields[i].data_type())
378+
} else {
379+
ScalarValue::iter_to_array(column_values.into_iter())?
380+
};
381+
column_wise_ordering_values.push(array);
382+
}
383+
384+
let ordering_array = StructArray::try_new(
385+
struct_field.clone(),
386+
column_wise_ordering_values,
387+
None,
388+
)?;
369389

370-
// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
371-
ScalarValue::List(ScalarValue::new_list(&orderings, &struct_type))
390+
Ok(ScalarValue::List(Arc::new(array_into_list_array(
391+
Arc::new(ordering_array),
392+
))))
372393
}
373394

374395
fn evaluate_values(&self) -> ScalarValue {

0 commit comments

Comments
 (0)