diff --git a/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a181f98b6eb..948db32beaa 100644 --- a/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -940,6 +940,7 @@ impl TryInto for &LogicalPlan { } LogicalPlan::Extension { .. } => unimplemented!(), LogicalPlan::Union { .. } => unimplemented!(), + LogicalPlan::CartesianJoin { .. } => unimplemented!(), } } } diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 328a68dd6a6..b203ceb3f74 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -1374,6 +1374,11 @@ mod tests { run_query(6).await } + #[tokio::test] + async fn run_q9() -> Result<()> { + run_query(9).await + } + #[tokio::test] async fn run_q10() -> Result<()> { run_query(10).await diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md index e5849b84ca7..70233b25ab9 100644 --- a/rust/datafusion/README.md +++ b/rust/datafusion/README.md @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the - [ ] MINUS - [x] Joins - [x] INNER JOIN - - [ ] CROSS JOIN + - [x] LEFT JOIN + - [x] RIGHT JOIN + - [x] CROSS JOIN - [ ] OUTER JOIN - [ ] Window diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs index fed82fd23b8..4f25c73e441 100644 --- a/rust/datafusion/src/logical_plan/builder.rs +++ b/rust/datafusion/src/logical_plan/builder.rs @@ -270,6 +270,20 @@ impl LogicalPlanBuilder { })) } } + /// Apply a cartesian join + pub fn cartesian_join(&self, right: &LogicalPlan) -> Result { + let left_fields = self.plan.schema().fields().iter(); + let right_fields = right.schema().fields(); + let fields = left_fields.chain(right_fields).cloned().collect(); + + let schema = DFSchema::new(fields)?; + + Ok(Self::from(&LogicalPlan::CartesianJoin { + left: Arc::new(self.plan.clone()), + right: Arc::new(right.clone()), + schema: DFSchemaRef::new(schema), + })) + } /// Repartition pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result { diff --git a/rust/datafusion/src/logical_plan/plan.rs b/rust/datafusion/src/logical_plan/plan.rs index d1b9b827a5a..9ad3791a6ad 100644 --- a/rust/datafusion/src/logical_plan/plan.rs +++ b/rust/datafusion/src/logical_plan/plan.rs @@ -113,6 +113,15 @@ pub enum LogicalPlan { /// The output schema, containing fields from the left and right inputs schema: DFSchemaRef, }, + /// Join two logical plans on one or more join columns + CartesianJoin { + /// Left input + left: Arc, + /// Right input + right: Arc, + /// The output schema, containing fields from the left and right inputs + schema: DFSchemaRef, + }, /// Repartition the plan based on a partitioning scheme. Repartition { /// The incoming logical plan @@ -203,6 +212,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { schema, .. } => &schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => &schema, + LogicalPlan::CartesianJoin { schema, .. } => &schema, LogicalPlan::Repartition { input, .. } => input.schema(), LogicalPlan::Limit { input, .. } => input.schema(), LogicalPlan::CreateExternalTable { schema, .. } => &schema, @@ -229,6 +239,11 @@ impl LogicalPlan { right, schema, .. + } + | LogicalPlan::CartesianJoin { + left, + right, + schema, } => { let mut schemas = left.all_schemas(); schemas.extend(right.all_schemas()); @@ -290,8 +305,9 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Limit { .. } | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => vec![], - LogicalPlan::Union { .. } => { + | LogicalPlan::CartesianJoin { .. } + | LogicalPlan::Explain { .. } + | LogicalPlan::Union { .. } => { vec![] } } @@ -307,6 +323,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], + LogicalPlan::CartesianJoin { left, right, .. } => vec![left, right], LogicalPlan::Limit { input, .. } => vec![input], LogicalPlan::Extension { node } => node.inputs(), LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), @@ -396,7 +413,8 @@ impl LogicalPlan { LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, - LogicalPlan::Join { left, right, .. } => { + LogicalPlan::Join { left, right, .. } + | LogicalPlan::CartesianJoin { left, right, .. } => { left.accept(visitor)? && right.accept(visitor)? } LogicalPlan::Union { inputs, .. } => { @@ -669,6 +687,9 @@ impl LogicalPlan { keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect(); write!(f, "Join: {}", join_expr.join(", ")) } + LogicalPlan::CartesianJoin { .. } => { + write!(f, "CartesianJoin:") + } LogicalPlan::Repartition { partitioning_scheme, .. diff --git a/rust/datafusion/src/optimizer/constant_folding.rs b/rust/datafusion/src/optimizer/constant_folding.rs index 2fa03eb5c70..5c391512a04 100644 --- a/rust/datafusion/src/optimizer/constant_folding.rs +++ b/rust/datafusion/src/optimizer/constant_folding.rs @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding { | LogicalPlan::Explain { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } => { + | LogicalPlan::Join { .. } + | LogicalPlan::CartesianJoin { .. } => { // apply the optimization to all inputs of the plan let inputs = plan.inputs(); let new_inputs = inputs diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index f44050f0b72..3270c2850a2 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { // we cannot predict the cardinality of the join output None } + LogicalPlan::CartesianJoin { left, right, .. } => { + // number of rows is equal to num_left * num_right + get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y)) + } LogicalPlan::Repartition { .. } => { // we cannot predict how rows will be repartitioned None @@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder { }) } } + LogicalPlan::CartesianJoin { + left, + right, + schema, + } => { + let left = self.optimize(left)?; + let right = self.optimize(right)?; + if should_swap_join_order(&left, &right) { + // Swap left and right + Ok(LogicalPlan::CartesianJoin { + left: Arc::new(right), + right: Arc::new(left), + schema: schema.clone(), + }) + } else { + // Keep join as is + Ok(LogicalPlan::CartesianJoin { + left: Arc::new(left), + right: Arc::new(right), + schema: schema.clone(), + }) + } + } // Rest: recurse into plan, apply optimization where possible LogicalPlan::Projection { .. } | LogicalPlan::Aggregate { .. } diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 6b1cdfe18ca..118c142dd9a 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -270,6 +270,7 @@ fn optimize_plan( | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Union { .. } + | LogicalPlan::CartesianJoin { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); // collect all required columns by this plan diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index fe1d0238191..eb91f25921c 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -208,6 +208,11 @@ pub fn from_plan( on: on.clone(), schema: schema.clone(), }), + LogicalPlan::CartesianJoin { schema, .. } => Ok(LogicalPlan::CartesianJoin { + left: Arc::new(inputs[0].clone()), + right: Arc::new(inputs[1].clone()), + schema: schema.clone(), + }), LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit { n: *n, input: Arc::new(inputs[0].clone()), diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs new file mode 100644 index 00000000000..d394b917f40 --- /dev/null +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use futures::StreamExt; +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use futures::{Stream, TryStreamExt}; + +use futures::lock::Mutex; + +use super::{hash_utils::check_join_is_valid, merge::MergeExec}; +use crate::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; +use async_trait::async_trait; +use std::time::Instant; + +use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; +use log::debug; + +/// Data of the left side +type JoinLeftData = Vec; + +/// executes partitions in parallel and combines them into a set of +/// partitions by combining all values from the left with all values on the right +#[derive(Debug)] +pub struct CartesianJoinExec { + /// left (build) side which gets loaded in memory + left: Arc, + /// right (probe) side which are combined with left side + right: Arc, + /// The schema once the join is applied + schema: SchemaRef, + /// Build-side data + build_side: Arc>>, +} + +impl CartesianJoinExec { + /// Tries to create a new [CartesianJoinExec]. + /// # Error + /// This function errors when it is not possible to join the left and right sides on keys `on`. + pub fn try_new( + left: Arc, + right: Arc, + ) -> Result { + let left_schema = left.schema(); + let right_schema = right.schema(); + check_join_is_valid(&left_schema, &right_schema, &[])?; + + let left_schema = left.schema(); + let left_fields = left_schema.fields().iter(); + let right_schema = right.schema(); + + let right_fields = right_schema.fields().iter(); + + // left then right + let all_columns = left_fields.chain(right_fields).cloned().collect(); + + let schema = Arc::new(Schema::new(all_columns)); + + Ok(CartesianJoinExec { + left, + right, + schema, + build_side: Arc::new(Mutex::new(None)), + }) + } + + /// left (build) side which gets loaded in memory + pub fn left(&self) -> &Arc { + &self.left + } + + /// right side which gets combined with left side + pub fn right(&self) -> &Arc { + &self.right + } +} + +#[async_trait] +impl ExecutionPlan for CartesianJoinExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 2 => Ok(Arc::new(CartesianJoinExec::try_new( + children[0].clone(), + children[1].clone(), + )?)), + _ => Err(DataFusionError::Internal( + "CartesianJoinExec wrong number of children".to_string(), + )), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.right.output_partitioning() + } + + async fn execute(&self, partition: usize) -> Result { + // we only want to compute the build side once + let left_data = { + let mut build_side = self.build_side.lock().await; + + match build_side.as_ref() { + Some(stream) => stream.clone(), + None => { + let start = Instant::now(); + + // merge all left parts into a single stream + let merge = MergeExec::new(self.left.clone()); + let stream = merge.execute(0).await?; + + // Load all batches and count the rows + let (batches, num_rows) = stream + .try_fold((Vec::new(), 0usize), |mut acc, batch| async { + acc.1 += batch.num_rows(); + acc.0.push(batch); + Ok(acc) + }) + .await?; + + *build_side = Some(batches.clone()); + + debug!( + "Built build-side of cartesian join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + + batches + } + } + }; + + let stream = self.right.execute(partition).await?; + + Ok(Box::pin(CartesianJoinStream { + schema: self.schema.clone(), + left_data, + right: stream, + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + join_time: 0, + })) + } +} + +/// A stream that issues [RecordBatch]es as they arrive from the right of the join. +struct CartesianJoinStream { + /// Input schema + schema: Arc, + /// data from the left side + left_data: JoinLeftData, + /// right + right: SendableRecordBatchStream, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for joining probe-side batches to the build-side batches + join_time: usize, +} + +impl RecordBatchStream for CartesianJoinStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} +fn build_batch( + batch: &RecordBatch, + left_data: &[RecordBatch], + schema: &Schema, +) -> ArrowResult { + let mut batches = Vec::new(); + let mut num_rows = 0; + for left in left_data.iter() { + for i in 0..left.num_rows() { + // for each value on the left, repeat the value of the right + let arrays = left + .columns() + .iter() + .map(|arr| { + let scalar = ScalarValue::try_from_array(arr, i)?; + Ok(scalar.to_array_of_size(batch.num_rows())) + }) + .collect::>>() + .map_err(|x| x.into_arrow_external_error())?; + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + arrays + .iter() + .chain(batch.columns().iter()) + .cloned() + .collect(), + )?; + + batches.push(batch); + num_rows += left.num_rows(); + } + } + concat_batches(&Arc::new(schema.clone()), &batches, num_rows) +} + +impl Stream for CartesianJoinStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.right + .poll_next_unpin(cx) + .map(|maybe_batch| match maybe_batch { + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch(&batch, &self.left_data, &self.schema); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + Some(result) + } + other => { + debug!( + "Processed {} probe-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.join_time + ); + other + } + }) + } +} diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs index b26ff9bb5fc..a38cc092123 100644 --- a/rust/datafusion/src/physical_plan/hash_utils.rs +++ b/rust/datafusion/src/physical_plan/hash_utils.rs @@ -52,11 +52,6 @@ fn check_join_set_is_valid( right: &HashSet, on: &JoinOn, ) -> Result<()> { - if on.is_empty() { - return Err(DataFusionError::Plan( - "The 'on' clause of a join cannot be empty".to_string(), - )); - } let on_left = &on.iter().map(|on| on.0.to_string()).collect::>(); let left_missing = on_left.difference(left).collect::>(); diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 5036dcb921b..bf898517685 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -333,6 +333,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; +pub mod cartesian_join; pub mod coalesce_batches; pub mod common; #[cfg(feature = "crypto_expressions")] diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index f9279ae48f0..9d0f020c0dc 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use super::{ - aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, + aggregates, cartesian_join::CartesianJoinExec, empty::EmptyExec, expressions::binary, + functions, hash_join::PartitionMode, udaf, union::UnionExec, }; use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; @@ -328,6 +328,11 @@ impl DefaultPhysicalPlanner { )?)) } } + LogicalPlan::CartesianJoin { left, right, .. } => { + let left = self.create_initial_plan(left, ctx_state)?; + let right = self.create_initial_plan(right, ctx_state)?; + Ok(Arc::new(CartesianJoinExec::try_new(left, right)?)) + } LogicalPlan::EmptyRelation { produce_one_row, schema, diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index f3cba232a23..4ea1c7e1696 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -355,12 +355,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { JoinOperator::Inner(constraint) => { self.parse_join(left, &right, constraint, JoinType::Inner) } + JoinOperator::CrossJoin => self.parse_cross_join(left, &right), other => Err(DataFusionError::NotImplemented(format!( "Unsupported JOIN operator {:?}", other ))), } } + fn parse_cross_join( + &self, + left: &LogicalPlan, + right: &LogicalPlan, + ) -> Result { + LogicalPlanBuilder::from(&left) + .cartesian_join(&right)? + .build() + } fn parse_join( &self, @@ -489,9 +499,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } if join_keys.is_empty() { - return Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )); + left = LogicalPlanBuilder::from(&left) + .cartesian_join(right)? + .build()?; } else { let left_keys: Vec<_> = join_keys.iter().map(|(l, _)| *l).collect(); @@ -517,9 +527,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if plans.len() == 1 { Ok(plans[0].clone()) } else { - Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )) + let mut left = plans[0].clone(); + for right in plans.iter().skip(1) { + left = LogicalPlanBuilder::from(&left) + .cartesian_join(right)? + .build()?; + } + Ok(left) } } }; diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index f4d4e65f3a4..10ba8cd2bfd 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1290,13 +1290,45 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { #[tokio::test] async fn cartesian_join() -> Result<()> { - let ctx = create_join_context("t1_id", "t2_id")?; + let mut ctx = create_join_context("t1_id", "t2_id")?; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; - let maybe_plan = ctx.create_logical_plan(&sql); + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + assert_eq!( - "This feature is not implemented: Cartesian joins are not supported", - &format!("{}", maybe_plan.err().unwrap()) + actual, + [ + ["11", "a", "z"], + ["11", "a", "y"], + ["11", "a", "x"], + ["11", "a", "w"], + ["22", "b", "z"], + ["22", "b", "y"], + ["22", "b", "x"], + ["22", "b", "w"], + ["33", "c", "z"], + ["33", "c", "y"], + ["33", "c", "x"], + ["33", "c", "w"], + ["44", "d", "z"], + ["44", "d", "y"], + ["44", "d", "x"], + ["44", "d", "w"] + ] ); + Ok(()) }