Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion-physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ path = "src/lib.rs"
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
arrow = { version = "9.0.0", features = ["prettyprint"] }
paste = "^1.0"
ahash = { version = "0.7", default-features = false }
ordered-float = "2.10"
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

//! Coercion rules for matching argument types for binary operators

use crate::arrow::datatypes::DataType;
use crate::error::{DataFusionError, Result};
use crate::scalar::{MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128};
use arrow::datatypes::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use datafusion_common::{MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128};
use datafusion_expr::Operator;

/// Coercion rules for all binary operators. Returns the output type
Expand Down Expand Up @@ -492,8 +493,9 @@ fn eq_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::datatypes::DataType;
use crate::error::{DataFusionError, Result};
use arrow::datatypes::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use datafusion_expr::Operator;

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@
//! Aggregate function rule
//! Binary operation rule

pub(crate) mod aggregate_rule;
pub(crate) mod binary_rule;
pub mod binary_rule;
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
//! Defines physical expressions that can evaluated at runtime during query execution

use super::format_state_name;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
hyperloglog::HyperLogLog, Accumulator, AggregateExpr, PhysicalExpr,
};
use crate::scalar::ScalarValue;
use crate::{hyperloglog::HyperLogLog, AggregateExpr, PhysicalExpr};
use arrow::array::{
ArrayRef, BinaryArray, BinaryOffsetSizeTrait, GenericBinaryArray, GenericStringArray,
PrimitiveArray, StringOffsetSizeTrait,
Expand All @@ -31,6 +27,9 @@ use arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type,
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::type_name;
use std::any::Any;
use std::convert::TryFrom;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

//! Defines physical expressions for APPROX_MEDIAN that can be evaluated MEDIAN at runtime during query execution

use crate::{AggregateExpr, PhysicalExpr};
use arrow::{datatypes::DataType, datatypes::Field};
use datafusion_common::Result;
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;

use crate::error::Result;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use arrow::{datatypes::DataType, datatypes::Field};

/// MEDIAN aggregate expression
#[derive(Debug)]
pub struct ApproxMedian {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, iter, sync::Arc};

use super::{format_state_name, Literal};
use crate::{tdigest::TDigest, AggregateExpr, PhysicalExpr};
use arrow::{
array::{
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
datatypes::{DataType, Field},
};

use crate::{
error::DataFusionError,
physical_plan::{tdigest::TDigest, Accumulator, AggregateExpr, PhysicalExpr},
scalar::ScalarValue,
};

use crate::error::Result;

use super::{format_state_name, Literal};
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_expr::Accumulator;
use std::{any::Any, iter, sync::Arc};

/// Return `true` if `arg_type` is of a [`DataType`] that the
/// [`ApproxPercentileCont`] aggregation can operate on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
//! Defines physical expressions that can evaluated at runtime during query execution

use super::format_state_name;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use crate::{AggregateExpr, PhysicalExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -157,18 +158,18 @@ impl Accumulator for ArrayAggAccumulator {
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::expressions::col;
use crate::physical_plan::expressions::tests::aggregate;
use crate::{error::Result, generic_test_op};
use crate::expressions::col;
use crate::expressions::tests::aggregate;
use crate::generic_test_op;
use arrow::array::ArrayRef;
use arrow::array::Int32Array;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;

#[test]
fn array_agg_i32() -> Result<()> {
let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));

let list = ScalarValue::List(
Some(Box::new(vec![
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::{
ScalarValue, MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128,
};
use crate::{AggregateExpr, PhysicalExpr};
use arrow::compute;
use arrow::datatypes::DataType;
use arrow::{
array::{ArrayRef, UInt64Array},
datatypes::Field,
};
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{
ScalarValue, MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128,
};
use datafusion_expr::Accumulator;

use super::{format_state_name, sum};

Expand Down Expand Up @@ -70,7 +71,7 @@ pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
}
}

pub(crate) fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
pub fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
matches!(
arg_type,
DataType::UInt8
Expand Down Expand Up @@ -215,11 +216,11 @@ impl Accumulator for AvgAccumulator {
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::expressions::col;
use crate::{error::Result, generic_test_op};
use crate::expressions::col;
use crate::generic_test_op;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_common::Result;

#[test]
fn test_avg_return_data_type() -> Result<()> {
Expand Down Expand Up @@ -290,7 +291,7 @@ mod tests {

#[test]
fn avg_i32() -> Result<()> {
let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
generic_test_op!(
a,
DataType::Int32,
Expand Down Expand Up @@ -332,9 +333,8 @@ mod tests {

#[test]
fn avg_u32() -> Result<()> {
let a: ArrayRef = Arc::new(UInt32Array::from_slice(&[
1_u32, 2_u32, 3_u32, 4_u32, 5_u32,
]));
let a: ArrayRef =
Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32]));
generic_test_op!(
a,
DataType::UInt32,
Expand All @@ -346,9 +346,8 @@ mod tests {

#[test]
fn avg_f32() -> Result<()> {
let a: ArrayRef = Arc::new(Float32Array::from_slice(&[
1_f32, 2_f32, 3_f32, 4_f32, 5_f32,
]));
let a: ArrayRef =
Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32]));
generic_test_op!(
a,
DataType::Float32,
Expand All @@ -360,9 +359,8 @@ mod tests {

#[test]
fn avg_f64() -> Result<()> {
let a: ArrayRef = Arc::new(Float64Array::from_slice(&[
1_f64, 2_f64, 3_f64, 4_f64, 5_f64,
]));
let a: ArrayRef =
Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64]));
generic_test_op!(
a,
DataType::Float64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ use arrow::datatypes::{ArrowNumericType, DataType, Schema, TimeUnit};
use arrow::error::ArrowError::DivideByZero;
use arrow::record_batch::RecordBatch;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::coercion_rule::binary_rule::coerce_types;
use crate::physical_plan::expressions::try_cast;
use crate::physical_plan::{ColumnarValue, PhysicalExpr};
use crate::scalar::ScalarValue;
use crate::coercion_rule::binary_rule::coerce_types;
use crate::expressions::try_cast;
use crate::PhysicalExpr;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use datafusion_expr::Operator;

// TODO move to arrow_rs
Expand Down Expand Up @@ -1375,11 +1376,10 @@ pub fn binary(
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::physical_plan::expressions::{col, lit};
use crate::expressions::{col, lit};
use arrow::datatypes::{ArrowNumericType, Field, Int32Type, SchemaRef};
use arrow::util::display::array_value_to_string;
use datafusion_common::Result;

// Create a binary expression without coercion. Used here when we do not want to coerce the expressions
// to valid types. Usage can result in an execution (after plan) error.
Expand All @@ -1398,8 +1398,8 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Int32Array::from_slice(&[1, 2, 4, 8, 16]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);

// expression: "a < b"
let lt = binary_simple(
Expand Down Expand Up @@ -1432,8 +1432,8 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from_slice(&[2, 4, 6, 8, 10]);
let b = Int32Array::from_slice(&[2, 5, 4, 8, 8]);
let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
let b = Int32Array::from(vec![2, 5, 4, 8, 8]);

// expression: "a < b OR a == b"
let expr = binary_simple(
Expand Down Expand Up @@ -1831,14 +1831,14 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Int32Array::from_slice(&[1, 2, 4, 8, 16]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);

apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Plus,
Int32Array::from_slice(&[2, 4, 7, 12, 21]),
Int32Array::from(vec![2, 4, 7, 12, 21]),
)?;

Ok(())
Expand All @@ -1850,22 +1850,22 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from_slice(&[1, 2, 4, 8, 16]));
let b = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]));
let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));

apply_arithmetic::<Int32Type>(
schema.clone(),
vec![a.clone(), b.clone()],
Operator::Minus,
Int32Array::from_slice(&[0, 0, 1, 4, 11]),
Int32Array::from(vec![0, 0, 1, 4, 11]),
)?;

// should handle have negative values in result (for signed)
apply_arithmetic::<Int32Type>(
schema,
vec![b, a],
Operator::Minus,
Int32Array::from_slice(&[0, 0, -1, -4, -11]),
Int32Array::from(vec![0, 0, -1, -4, -11]),
)?;

Ok(())
Expand All @@ -1877,14 +1877,14 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from_slice(&[4, 8, 16, 32, 64]));
let b = Arc::new(Int32Array::from_slice(&[2, 4, 8, 16, 32]));
let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));

apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Multiply,
Int32Array::from_slice(&[8, 32, 128, 512, 2048]),
Int32Array::from(vec![8, 32, 128, 512, 2048]),
)?;

Ok(())
Expand All @@ -1896,14 +1896,14 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from_slice(&[8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from_slice(&[2, 4, 8, 16, 32]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));

apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Divide,
Int32Array::from_slice(&[4, 8, 16, 32, 64]),
Int32Array::from(vec![4, 8, 16, 32, 64]),
)?;

Ok(())
Expand All @@ -1915,14 +1915,14 @@ mod tests {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from_slice(&[8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from_slice(&[2, 4, 7, 14, 32]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));

apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Modulo,
Int32Array::from_slice(&[0, 0, 2, 8, 0]),
Int32Array::from(vec![0, 0, 2, 8, 0]),
)?;

Ok(())
Expand Down
Loading