Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,14 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
LogicalPlan::Subquery(_) => {
// note that the ballista and datafusion proto files need refactoring to allow
// LogicalExprNode to reference a LogicalPlanNode
// see https://github.com/apache/arrow-datafusion/issues/2338
Err(BallistaError::NotImplemented(
"Ballista does not support subqueries".to_string(),
))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::BinaryExpr { .. }
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::GetIndexedField { .. }
| Expr::Case { .. } => Recursion::Continue(self),

Expand Down
27 changes: 27 additions & 0 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,8 +1202,10 @@ pub(crate) fn expand_qualified_wildcard(
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
use datafusion_expr::expr_fn::exists;

use crate::logical_plan::StringifiedPlan;
use crate::test::test_table_scan_with_name;

use super::super::{col, lit, sum};
use super::*;
Expand Down Expand Up @@ -1339,6 +1341,31 @@ mod tests {
Ok(())
}

#[test]
fn exists_subquery() -> Result<()> {
let foo = test_table_scan_with_name("foo")?;
let bar = test_table_scan_with_name("bar")?;

let subquery = LogicalPlanBuilder::from(foo)
.project(vec![col("a")])?
.filter(col("a").eq(col("bar.a")))?
.build()?;

let outer_query = LogicalPlanBuilder::from(bar)
.project(vec![col("a")])?
.filter(exists(Arc::new(subquery)))?
.build()?;

let expected = "Filter: EXISTS (\
Subquery: Filter: #foo.a = #bar.a\
\n Projection: #foo.a\
\n TableScan: foo projection=None)\
\n Projection: #bar.a\n TableScan: bar projection=None";
assert_eq!(expected, format!("{:?}", outer_query));

Ok(())
}

#[test]
fn projection_non_unique_names() -> Result<()> {
let plan = LogicalPlanBuilder::scan_empty(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl ExprRewritable for Expr {
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(_) => self.clone(),
Expr::Exists(_) => self.clone(),
Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/logical_plan/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl ExprVisitable for Expr {
Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::Exists(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => Ok(visitor),
Expr::BinaryExpr { left, right, .. } => {
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exp,
exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano,
ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif,
octet_length, or, random, regexp_match, regexp_replace, repeat, replace, reverse,
right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part,
sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper,
when, Column, Expr, ExprSchema, Literal,
count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit,
lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now,
now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum,
sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
Expand All @@ -60,6 +60,6 @@ pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
};
pub use registry::FunctionRegistry;
2 changes: 1 addition & 1 deletion datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use crate::logical_expr::{
CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension,
FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
TableProviderFilterPushDown, TableSource,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
Expand Down Expand Up @@ -459,6 +460,9 @@ impl ExprIdentifierVisitor<'_> {
desc.push_str("InList-");
desc.push_str(&negated.to_string());
}
Expr::Exists(_) => {
desc.push_str("Exists-");
}
Expr::Wildcard => {
desc.push_str("Wildcard-");
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ fn optimize_plan(
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::Values(_)
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ impl<'a> ConstEvaluator<'a> {
| Expr::AggregateUDF { .. }
| Expr::ScalarVariable(_, _)
| Expr::Column(_)
| Expr::Exists(_)
| Expr::WindowFunction { .. }
| Expr::Sort { .. }
| Expr::Wildcard
Expand Down
14 changes: 12 additions & 2 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

use super::optimizer::OptimizerRule;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window,
use datafusion_expr::logical_plan::{
Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery,
SubqueryAlias, Window,
};

use crate::logical_plan::{
Expand Down Expand Up @@ -84,6 +85,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
| Expr::Exists(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::GetIndexedField { .. } => {}
Expand Down Expand Up @@ -223,6 +225,12 @@ pub fn from_plan(
let right = &inputs[1];
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlan::Subquery(_) => {
let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?;
Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(subquery),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
let schema = inputs[0].schema().as_ref().clone().into();
let schema =
Expand Down Expand Up @@ -363,6 +371,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
}
Ok(expr_list)
}
Expr::Exists(_) => Ok(vec![]),
Expr::Wildcard { .. } => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expand Down Expand Up @@ -497,6 +506,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
Expr::Column(_)
| Expr::Literal(_)
| Expr::InList { .. }
| Expr::Exists(_)
| Expr::ScalarVariable(_, _) => Ok(expr.clone()),
Expr::Sort {
asc, nulls_first, ..
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Ok(format!("{} IN ({:?})", expr, list))
}
}
Expr::Exists(_) => Err(DataFusionError::NotImplemented(
"EXISTS is not yet supported in the physical plan".to_string(),
)),
Expr::Between {
expr,
negated,
Expand Down Expand Up @@ -780,6 +783,7 @@ impl DefaultPhysicalPlanner {
let right = self.create_initial_plan(right, session_state).await?;
Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
schema,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub use crate::execution::options::{
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
exists, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
octet_length, random, regexp_match, regexp_replace, repeat, replace, reverse, right,
rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr,
sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,10 @@ where
asc: *asc,
nulls_first: *nulls_first,
}),
Expr::Column { .. } | Expr::Literal(_) | Expr::ScalarVariable(_, _) => {
Ok(expr.clone())
}
Expr::Column { .. }
| Expr::Literal(_)
| Expr::ScalarVariable(_, _)
| Expr::Exists(_) => Ok(expr.clone()),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::aggregate_function;
use crate::built_in_function;
use crate::expr_fn::binary_expr;
use crate::logical_plan::Subquery;
use crate::window_frame;
use crate::window_function;
use crate::AggregateUDF;
Expand Down Expand Up @@ -226,6 +227,8 @@ pub enum Expr {
/// Whether the expression is negated
negated: bool,
},
/// EXISTS subquery
Exists(Subquery),
/// Represents a reference to all fields in a schema.
Wildcard,
/// Represents a reference to all fields in a specific schema.
Expand Down Expand Up @@ -431,6 +434,7 @@ impl fmt::Debug for Expr {
Expr::Negative(expr) => write!(f, "(- {:?})", expr),
Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
Expr::Exists(subquery) => write!(f, "EXISTS ({:?})", subquery),
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right)
}
Expand Down Expand Up @@ -618,6 +622,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::Exists(_) => Ok("EXISTS".to_string()),
Expr::GetIndexedField { expr, key } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{}[{}]", expr, key))
Expand Down
9 changes: 8 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
//! Functions for creating logical expressions

use crate::conditional_expressions::CaseBuilder;
use crate::{aggregate_function, built_in_function, lit, Expr, Operator};
use crate::logical_plan::Subquery;
use crate::{aggregate_function, built_in_function, lit, Expr, LogicalPlan, Operator};
use std::sync::Arc;

/// Create a column expression based on a qualified or unqualified column name
pub fn col(ident: &str) -> Expr {
Expand Down Expand Up @@ -180,6 +182,11 @@ pub fn approx_percentile_cont_with_weight(
}
}

/// Create an EXISTS subquery expression
pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
Expr::Exists(Subquery { subquery })
}

// TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many
// varying arity functions
/// Create an convenience function representing a unary scalar function
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl ExprSchemable for Expr {
}
Expr::Not(_)
| Expr::IsNull(_)
| Expr::Exists(_)
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expand Down Expand Up @@ -172,7 +173,7 @@ impl ExprSchemable for Expr {
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. } => Ok(true),
Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists(_) => Ok(false),
Expr::BinaryExpr {
ref left,
ref right,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ pub use expr_fn::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, case, ceil, character_length, chr, coalesce, col, concat,
concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, date_part,
date_trunc, digest, exp, floor, in_list, initcap, left, length, ln, log10, log2,
lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or, random,
regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim,
sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
date_trunc, digest, exists, exp, floor, in_list, initcap, left, length, ln, log10,
log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or,
random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad,
rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with,
strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_seconds, translate, trim, trunc, upper, when,
};
pub use expr_schema::ExprSchemable;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension, FileType,
Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, SubqueryAlias,
PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias,
TableScan, ToStringifiedPlan, Union, Values, Window,
};

Expand Down
Loading