diff --git a/benchmarks/README.md b/benchmarks/README.md index 4e226376c0e7..872500ef849f 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -440,37 +440,6 @@ Your benchmark should create and use an instance of `BenchmarkRun` defined in `b The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience. -## Cancellation - -Test performance of cancelling queries. - -Queries in DataFusion should stop executing "quickly" after they are -cancelled (the output stream is dropped). - -The queries are executed on a synthetic dataset generated during -the benchmark execution that is an anonymized version of a -real-world data set. - -The query is an anonymized version of a real-world query, and the -test starts the query then cancels it and reports how long it takes -for the runtime to fully exit. - -Example output: - -``` -Using 7 files found on disk -Starting to load data into in-memory object store -Done loading data into in-memory object store -in main, sleeping -Starting spawned -Creating logical plan... -Creating physical plan... -Executing physical plan... -Getting results... -cancelling thread -done dropping runtime in 83.531417ms -``` - ## ClickBench The ClickBench[1] benchmarks are widely cited in the industry and @@ -741,3 +710,50 @@ For example, to run query 1 with the small data generated above: ```bash cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/window.sql --query 1 ``` + +# Micro-Benchmarks + +## Nested Loop Join + +This benchmark focuses on the performance of queries with nested loop joins, minimizing other overheads such as scanning data sources or evaluating predicates. + +Different queries are included to test nested loop joins under various workloads. + +### Example Run + +```bash +# No need to generate data: this benchmark uses table function `range()` as the data source + +./bench.sh run nlj +``` + +## Cancellation + +Test performance of cancelling queries. + +Queries in DataFusion should stop executing "quickly" after they are +cancelled (the output stream is dropped). + +The queries are executed on a synthetic dataset generated during +the benchmark execution that is an anonymized version of a +real-world data set. + +The query is an anonymized version of a real-world query, and the +test starts the query then cancels it and reports how long it takes +for the runtime to fully exit. + +Example output: + +``` +Using 7 files found on disk +Starting to load data into in-memory object store +Done loading data into in-memory object store +in main, sleeping +Starting spawned +Creating logical plan... +Creating physical plan... +Executing physical plan... +Getting results... +cancelling thread +done dropping runtime in 83.531417ms +``` diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 8952e456398d..b99ab010058f 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -124,6 +124,7 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver # Micro-Benchmarks (specific operators and features) cancellation: How long cancelling a query takes +nlj: Benchmark for simple nested loop joins, testing various join scenarios ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Supported Configuration (Environment Variables) @@ -196,6 +197,7 @@ main() { data_clickbench_1 data_clickbench_partitioned data_imdb + # nlj uses range() function, no data generation needed ;; tpch) data_tpch "1" @@ -298,6 +300,10 @@ main() { # same data as for tpch data_tpch "1" ;; + nlj) + # nlj uses range() function, no data generation needed + echo "NLJ benchmark does not require data generation" + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -354,6 +360,7 @@ main() { run_h2o_join "BIG" "PARQUET" "join" run_imdb run_external_aggr + run_nlj ;; tpch) run_tpch "1" "parquet" @@ -458,6 +465,9 @@ main() { topk_tpch) run_topk_tpch ;; + nlj) + run_nlj + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -1085,6 +1095,14 @@ run_topk_tpch() { $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} } +# Runs the nlj benchmark +run_nlj() { + RESULTS_FILE="${RESULTS_DIR}/nlj.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running nlj benchmark..." + debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} +} + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index e92fd115c7d8..88378492b726 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, sort_tpch, tpch}; +use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, nlj, sort_tpch, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -42,6 +42,7 @@ enum Options { Clickbench(clickbench::RunOpt), H2o(h2o::RunOpt), Imdb(imdb::RunOpt), + Nlj(nlj::RunOpt), SortTpch(sort_tpch::RunOpt), Tpch(tpch::RunOpt), TpchConvert(tpch::ConvertOpt), @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> { Options::Clickbench(opt) => opt.run().await, Options::H2o(opt) => opt.run().await, Options::Imdb(opt) => Box::pin(opt.run()).await, + Options::Nlj(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, Options::Tpch(opt) => Box::pin(opt.run()).await, Options::TpchConvert(opt) => opt.run().await, diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index e7657c4078d1..5d982fad6f77 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,6 +20,7 @@ pub mod cancellation; pub mod clickbench; pub mod h2o; pub mod imdb; +pub mod nlj; pub mod sort_tpch; pub mod tpch; pub mod util; diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs new file mode 100644 index 000000000000..e412c0ade8a8 --- /dev/null +++ b/benchmarks/src/nlj.rs @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; +use datafusion::physical_plan::execute_stream; +use datafusion::{error::Result, prelude::SessionContext}; +use datafusion_common::instant::Instant; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError}; +use structopt::StructOpt; + +use futures::StreamExt; + +/// Run the Nested Loop Join (NLJ) benchmark +/// +/// This micro-benchmark focuses on the performance characteristics of NLJs. +/// +/// It always tries to use fast scanners (without decoding overhead) and +/// efficient predicate expressions to ensure it can reflect the performance +/// of the NLJ operator itself. +/// +/// In this micro-benchmark, the following workload characteristics will be +/// varied: +/// - Join type: Inner/Left/Right/Full (all for the NestedLoopJoin physical +/// operator) +/// TODO: Include special join types (Semi/Anti/Mark joins) +/// - Input size: Different combinations of left (build) side and right (probe) +/// side sizes +/// - Selectivity of join filters +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + /// Query number (between 1 and 10). If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} + +/// Inline SQL queries for NLJ benchmarks +/// +/// Each query's comment includes: +/// - Left (build) side row count × Right (probe) side row count +/// - Join predicate selectivity (1% means the output size is 1% * input size) +const NLJ_QUERIES: &[&str] = &[ + // Q1: INNER 10K x 10K | LOW 0.1% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q2: INNER 10K x 10K | Medium 20% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 5 = 0; + "#, + // Q3: INNER 10K x 10K | High 90% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 10 <> 0; + "#, + // Q4: INNER 30K x 30K | Medium 20% + r#" + SELECT * + FROM range(30000) AS t1 + JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 5 = 0; + "#, + // Q5: INNER 10K x 200K | LOW 0.1% (small to large) + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(200000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q6: INNER 200K x 10K | LOW 0.1% (large to small) + r#" + SELECT * + FROM range(200000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q7: RIGHT OUTER 10K x 200K | LOW 0.1% + r#" + SELECT * + FROM range(10000) AS t1 + RIGHT JOIN range(200000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q8: LEFT OUTER 200K x 10K | LOW 0.1% + r#" + SELECT * + FROM range(200000) AS t1 + LEFT JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q9: FULL OUTER 30K x 30K | LOW 0.1% + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q10: FULL OUTER 30K x 30K | High 90% + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 10 <> 0; + "#, + // Q11: INNER 30K x 30K | MEDIUM 50% | cheap predicate + r#" + SELECT * + FROM range(30000) AS t1 + INNER JOIN range(30000) AS t2 + ON (t1.value > t2.value); + "#, + // Q12: FULL OUTER 30K x 30K | MEDIUM 50% | cheap predicate + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value > t2.value); + "#, +]; + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running NLJ benchmarks with the following options: {self:#?}\n"); + + // Define query range + let query_range = match self.query { + Some(query_id) => { + if query_id >= 1 && query_id <= NLJ_QUERIES.len() { + query_id..=query_id + } else { + return exec_err!( + "Query {query_id} not found. Available queries: 1 to {}", + NLJ_QUERIES.len() + ); + } + } + None => 1..=NLJ_QUERIES.len(), + }; + + let config = self.common.config()?; + let rt_builder = self.common.runtime_env_builder()?; + let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + + let mut benchmark_run = BenchmarkRun::new(); + for query_id in query_range { + let query_index = query_id - 1; // Convert 1-based to 0-based index + + let sql = NLJ_QUERIES[query_index]; + benchmark_run.start_new_case(&format!("Query {query_id}")); + let query_run = self.benchmark_query(sql, &query_id.to_string(), &ctx).await; + match query_run { + Ok(query_results) => { + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + Err(e) => { + return Err(DataFusionError::Context( + "NLJ benchmark Q{query_id} failed with error:".to_string(), + Box::new(e), + )); + } + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + Ok(()) + } + + /// Validates that the query's physical plan uses a NestedLoopJoin (NLJ), + /// then executes the query and collects execution times. + /// + /// TODO: ensure the optimizer won't change the join order (it's not at + /// v48.0.0). + async fn benchmark_query( + &self, + sql: &str, + query_name: &str, + ctx: &SessionContext, + ) -> Result> { + let mut query_results = vec![]; + + // Validate that the query plan includes a Nested Loop Join + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let plan_string = format!("{physical_plan:#?}"); + + if !plan_string.contains("NestedLoopJoinExec") { + return Err(exec_datafusion_err!( + "Query {query_name} does not use Nested Loop Join. Physical plan: {plan_string}" + )); + } + + for i in 0..self.common.iterations { + let start = Instant::now(); + + let row_count = Self::execute_sql_without_result_buffering(sql, ctx).await?; + + let elapsed = start.elapsed(); + + println!( + "Query {query_name} iteration {i} returned {row_count} rows in {elapsed:?}" + ); + + query_results.push(QueryResult { elapsed, row_count }); + } + + Ok(query_results) + } + + /// Executes the SQL query and drops each result batch after evaluation, to + /// minimizes memory usage by not buffering results. + /// + /// Returns the total result row count + async fn execute_sql_without_result_buffering( + sql: &str, + ctx: &SessionContext, + ) -> Result { + let mut row_count = 0; + + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let mut stream = execute_stream(physical_plan, ctx.task_ctx())?; + + while let Some(batch) = stream.next().await { + row_count += batch?.num_rows(); + + // Evaluate the result and do nothing, the result will be dropped + // to reduce memory pressure + } + + Ok(row_count) + } +} diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 5bb1673d4af2..596e890b879c 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -18,43 +18,45 @@ //! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates). use std::any::Any; -use std::cmp::min; use std::fmt::Formatter; +use std::ops::{BitOr, ControlFlow}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use super::utils::{ - asymmetric_join_output_partitioning, get_final_indices_from_shared_bitmap, - need_produce_result_in_final, reorder_output_after_swap, swap_join_projection, - StatefulStreamResult, + asymmetric_join_output_partitioning, need_produce_result_in_final, + reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::joins::utils::{ - adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, check_join_is_valid, estimate_join_statistics, - BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, + need_produce_right_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + OnceAsync, OnceFut, }; use crate::joins::SharedBitmapBuilder; -use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, }; use crate::{ - handle_state, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; -use arrow::array::{BooleanBufferBuilder, PrimitiveArray, UInt32Array, UInt64Array}; -use arrow::compute::concat_batches; -use arrow::datatypes::{Schema, SchemaRef, UInt32Type, UInt64Type}; +use arrow::array::{ + new_null_array, Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions, +}; +use arrow::buffer::BooleanBuffer; +use arrow::compute::{concat_batches, filter, filter_record_batch, not, BatchCoalescer}; +use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use datafusion_common::cast::as_boolean_array; use datafusion_common::{ - exec_datafusion_err, internal_datafusion_err, internal_err, project_schema, JoinSide, - Result, Statistics, + arrow_err, internal_datafusion_err, internal_err, project_schema, + unwrap_or_internal_err, DataFusionError, JoinSide, Result, ScalarValue, Statistics, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; @@ -63,92 +65,101 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use futures::{ready, Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use log::debug; use parking_lot::Mutex; -/// Left (build-side) data -struct JoinLeftData { - /// Build-side data collected to single batch - batch: RecordBatch, - /// Shared bitmap builder for visited left indices - bitmap: SharedBitmapBuilder, - /// Counter of running probe-threads, potentially able to update `bitmap` - probe_threads_counter: AtomicUsize, - /// Memory reservation for tracking batch and bitmap - /// Cleared on `JoinLeftData` drop - /// reservation is cleared on Drop - #[expect(dead_code)] - reservation: MemoryReservation, -} - -impl JoinLeftData { - fn new( - batch: RecordBatch, - bitmap: SharedBitmapBuilder, - probe_threads_counter: AtomicUsize, - reservation: MemoryReservation, - ) -> Self { - Self { - batch, - bitmap, - probe_threads_counter, - reservation, - } - } - - fn batch(&self) -> &RecordBatch { - &self.batch - } - - fn bitmap(&self) -> &SharedBitmapBuilder { - &self.bitmap - } - - /// Decrements counter of running threads, and returns `true` - /// if caller is the last running thread - fn report_probe_completed(&self) -> bool { - self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 - } -} - #[allow(rustdoc::private_intra_doc_links)] -/// NestedLoopJoinExec is build-probe join operator, whose main task is to -/// perform joins without any equijoin conditions in `ON` clause. +/// NestedLoopJoinExec is a build-probe join operator designed for joins that +/// do not have equijoin keys in their `ON` clause. /// -/// Execution consists of following phases: +/// # Execution Flow /// -/// #### 1. Build phase -/// Collecting build-side data in memory, by polling all available data from build-side input. -/// Due to the absence of equijoin conditions, it's not possible to partition build-side data -/// across multiple threads of the operator, so build-side is always collected in a single -/// batch shared across all threads. -/// The operator always considers LEFT input as build-side input, so it's crucial to adjust -/// smaller input to be the LEFT one. Normally this selection is handled by physical optimizer. +/// ```text +/// Incoming right batch +/// Left Side Buffered Batches +/// ┌───────────┐ ┌───────────────┐ +/// │ ┌───────┐ │ │ │ +/// │ │ │ │ │ │ +/// Current Left Row ───▶│ ├───────├─┤──────────┐ │ │ +/// │ │ │ │ │ └───────────────┘ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ └───────┘ │ │ │ +/// │ ┌───────┐ │ │ │ +/// │ │ │ │ │ ┌─────┘ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ └───────┘ │ ▼ ▼ +/// │ ...... │ ┌──────────────────────┐ +/// │ │ │X (Cartesian Product) │ +/// │ │ └──────────┬───────────┘ +/// └───────────┘ │ +/// │ +/// ▼ +/// ┌───────┬───────────────┐ +/// │ │ │ +/// │ │ │ +/// │ │ │ +/// └───────┴───────────────┘ +/// Intermediate Batch +/// (For join predicate evaluation) +/// ``` /// -/// #### 2. Probe phase -/// Sequentially polling batches from the probe-side input and processing them according to the -/// following logic: -/// - apply join filter (`ON` clause) to Cartesian product of probe batch and build side data -/// -- filter evaluation is executed once per build-side data row -/// - update shared bitmap of joined ("visited") build-side row indices, if required -- allows -/// to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN after probing phase -/// completed -/// - perform join index alignment is required -- depending on `JoinType` -/// - produce output join batch +/// The execution follows a two-phase design: /// -/// Probing phase is executed in parallel, according to probe-side input partitioning -- one -/// thread per partition. After probe input is exhausted, each thread **ATTEMPTS** to produce -/// unmatched build-side data. +/// ## 1. Buffering Left Input +/// - The operator eagerly buffers all left-side input batches into memory, +/// util a memory limit is reached. +/// Currently, an out-of-memory error will be thrown if all the left-side input batches +/// cannot fit into memory at once. +/// In the future, it's possible to make this case finish execution. (see +/// 'Memory-limited Execution' section) +/// - The rationale for buffering the left side is that scanning the right side +/// can be expensive (e.g., decoding Parquet files), so buffering more left +/// rows reduces the number of right-side scan passes required. /// -/// #### 3. Producing unmatched build-side data -/// Producing unmatched build-side data as an output batch, after probe input is exhausted. -/// This step is also executed in parallel (once per probe input partition), and to avoid -/// duplicate output of unmatched data (due to shared nature build-side data), each thread -/// "reports" about probe phase completion (which means that "visited" bitmap won't be -/// updated anymore), and only the last thread, reporting about completion, will return output. +/// ## 2. Probing Right Input +/// - Right-side input is streamed batch by batch. +/// - For each right-side batch: +/// - It evaluates the join filter against the full buffered left input. +/// This results in a Cartesian product between the right batch and each +/// left row -- with the join predicate/filter applied -- for each inner +/// loop iteration. +/// - Matched results are accumulated into an output buffer. (see more in +/// `Output Buffering Strategy` section) +/// - This process continues until all right-side input is consumed. /// -/// # Clone / Shared State +/// # Producing unmatched build-side data +/// - For special join types like left/full joins, it's required to also output +/// unmatched pairs. During execution, bitmaps are kept for both left and right +/// sides of the input; they'll be handled by dedicated states in `NLJStream`. +/// - The final output of the left side unmatched rows is handled by a single +/// partition for simplicity, since it only counts a small portion of the +/// execution time. (e.g. if probe side has 10k rows, the final output of +/// unmatched build side only roughly counts for 1/10k of the total time) +/// +/// # Output Buffering Strategy +/// The operator uses an intermediate output buffer to accumulate results. Once +/// the output threshold is reached (currently set to the same value as +/// `batch_size` in the configuration), the results will be eagerly output. +/// +/// # Extra Notes +/// - The operator always considers the **left** side as the build (buffered) side. +/// Therefore, the physical optimizer should assign the smaller input to the left. +/// - The design try to minimize the intermediate data size to approximately +/// 1 batch, for better cache locality and memory efficiency. +/// +/// # TODO: Memory-limited Execution +/// If the memory budget is exceeded during left-side buffering, fallback +/// strategies such as streaming left batches and re-scanning the right side +/// may be implemented in the future. +/// +/// Tracking issue: /// +/// # Clone / Shared State /// Note this structure includes a [`OnceAsync`] that is used to coordinate the /// loading of the left side with the processing in each output stream. /// Therefore it can not be [`Clone`] @@ -169,8 +180,8 @@ pub struct NestedLoopJoinExec { /// This structure is *shared* across all output streams. /// /// Each output stream waits on the `OnceAsync` to signal the completion of - /// the hash table creation. - inner_table: OnceAsync, + /// the build(left) side data, and buffer them all for later joining. + build_side_data: OnceAsync, /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join @@ -211,7 +222,7 @@ impl NestedLoopJoinExec { filter, join_type: *join_type, join_schema, - inner_table: Default::default(), + build_side_data: Default::default(), column_indices, projection, metrics: Default::default(), @@ -307,29 +318,9 @@ impl NestedLoopJoinExec { )) } - /// Returns a vector indicating whether the left and right inputs maintain their order. - /// The first element corresponds to the left input, and the second to the right. - /// - /// The left (build-side) input's order may change, but the right (probe-side) input's - /// order is maintained for INNER, RIGHT, RIGHT ANTI, and RIGHT SEMI joins. - /// - /// Maintaining the right input's order helps optimize the nodes down the pipeline - /// (See [`ExecutionPlan::maintains_input_order`]). - /// - /// This is a separate method because it is also called when computing properties, before - /// a [`NestedLoopJoinExec`] is created. It also takes [`JoinType`] as an argument, as - /// opposed to `Self`, for the same reason. - fn maintains_input_order(join_type: JoinType) -> Vec { - vec![ - false, - matches!( - join_type, - JoinType::Inner - | JoinType::Right - | JoinType::RightAnti - | JoinType::RightSemi - ), - ] + /// This join implementation does not preserve the input order of either side. + fn maintains_input_order(_join_type: JoinType) -> Vec { + vec![false, false] } pub fn contains_projection(&self) -> bool { @@ -499,7 +490,7 @@ impl ExecutionPlan for NestedLoopJoinExec { MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]")) .register(context.memory_pool()); - let inner_table = self.inner_table.try_once(|| { + let build_side_data = self.build_side_data.try_once(|| { let stream = self.left.execute(0, Arc::clone(&context))?; Ok(collect_left_input( @@ -513,13 +504,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let batch_size = context.session_config().batch_size(); - let outer_table = self.right.execute(partition, context)?; - - let indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0)); - - // Right side has an order and it is maintained during operation. - let right_side_ordered = - self.maintains_input_order()[1] && self.right.output_ordering().is_some(); + let probe_side_data = self.right.execute(partition, context)?; // update column indices to reflect the projection let column_indices_after_projection = match &self.projection { @@ -530,21 +515,16 @@ impl ExecutionPlan for NestedLoopJoinExec { None => self.column_indices.clone(), }; - Ok(Box::pin(NestedLoopJoinStream { - schema: self.schema(), - filter: self.filter.clone(), - join_type: self.join_type, - outer_table, - inner_table, - column_indices: column_indices_after_projection, + Ok(Box::pin(NestedLoopJoinStream::new( + self.schema(), + self.filter.clone(), + self.join_type, + probe_side_data, + build_side_data, + column_indices_after_projection, join_metrics, - indices_cache, - right_side_ordered, - state: NestedLoopJoinStreamState::WaitBuildSide, - left_data: None, - join_result_status: None, - intermediate_batch_size: batch_size, - })) + batch_size, + ))) } fn metrics(&self) -> Option { @@ -607,6 +587,57 @@ impl ExecutionPlan for NestedLoopJoinExec { } } +impl EmbeddedProjection for NestedLoopJoinExec { + fn with_projection(&self, projection: Option>) -> Result { + self.with_projection(projection) + } +} + +/// Left (build-side) data +pub(crate) struct JoinLeftData { + /// Build-side data collected to single batch + batch: RecordBatch, + /// Shared bitmap builder for visited left indices + bitmap: SharedBitmapBuilder, + /// Counter of running probe-threads, potentially able to update `bitmap` + probe_threads_counter: AtomicUsize, + /// Memory reservation for tracking batch and bitmap + /// Cleared on `JoinLeftData` drop + /// reservation is cleared on Drop + #[expect(dead_code)] + reservation: MemoryReservation, +} + +impl JoinLeftData { + pub(crate) fn new( + batch: RecordBatch, + bitmap: SharedBitmapBuilder, + probe_threads_counter: AtomicUsize, + reservation: MemoryReservation, + ) -> Self { + Self { + batch, + bitmap, + probe_threads_counter, + reservation, + } + } + + pub(crate) fn batch(&self) -> &RecordBatch { + &self.batch + } + + pub(crate) fn bitmap(&self) -> &SharedBitmapBuilder { + &self.bitmap + } + + /// Decrements counter of running threads, and returns `true` + /// if caller is the last running thread + pub(crate) fn report_probe_completed(&self) -> bool { + self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 + } +} + /// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it async fn collect_left_input( stream: SendableRecordBatchStream, @@ -660,529 +691,1204 @@ async fn collect_left_input( )) } -/// This enumeration represents various states of the nested loop join algorithm. -#[derive(Debug, Clone)] -enum NestedLoopJoinStreamState { - /// The initial state, indicating that build-side data not collected yet - WaitBuildSide, - /// Indicates that build-side has been collected, and stream is ready for - /// fetching probe-side - FetchProbeBatch, - /// Indicates that a non-empty batch has been fetched from probe-side, and - /// is ready to be processed - ProcessProbeBatch(RecordBatch), - /// Preparation phase: Gathers the indices of unmatched rows from the build-side. - /// This state is entered for join types that emit unmatched build-side rows - /// (e.g., LEFT and FULL joins) after the entire probe-side input has been consumed. - PrepareUnmatchedBuildRows, - /// Output unmatched build-side rows. - /// The indices for rows to output has already been calculated in the previous - /// `PrepareUnmatchedBuildRows` state. In this state the final batch will be materialized incrementally. - // The inner `RecordBatch` is an empty dummy batch used to get right schema. - OutputUnmatchedBuildRows(RecordBatch), - /// Indicates that NestedLoopJoinStream execution is completed - Completed, -} - -impl NestedLoopJoinStreamState { - /// Tries to extract a `ProcessProbeBatchState` from the - /// `NestedLoopJoinStreamState` enum. Returns an error if state is not - /// `ProcessProbeBatchState`. - fn try_as_process_probe_batch(&mut self) -> Result<&RecordBatch> { - match self { - NestedLoopJoinStreamState::ProcessProbeBatch(state) => Ok(state), - _ => internal_err!("Expected join stream in ProcessProbeBatch state"), - } - } -} - -/// Tracks incremental output of join result batches. -/// -/// Initialized with all matching pairs that satisfy the join predicate. -/// Pairs are stored as indices in `build_indices` and `probe_indices` -/// Each poll outputs a batch within the configured size limit and updates -/// processed_count until all pairs are consumed. -/// -/// Example: 5000 matches, batch size limit is 100 -/// - Poll 1: output batch[0..100], processed_count = 100 -/// - Poll 2: output batch[100..200], processed_count = 200 -/// - ...continues until processed_count = 5000 -struct JoinResultProgress { - /// Row indices from build-side table (left table). - build_indices: PrimitiveArray, - /// Row indices from probe-side table (right table). - probe_indices: PrimitiveArray, - /// Number of index pairs already processed into output batches. - /// We have completed join result for indices [0..processed_count). - processed_count: usize, +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, } - -/// A stream that issues [RecordBatch]es as they arrive from the right of the join. -struct NestedLoopJoinStream { - /// Input schema - schema: Arc, +pub(crate) struct NestedLoopJoinStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // + // Note: The implementation uses the terms left/build-side table and + // right/probe-side table interchangeably. Treating the left side as the + // build side is a convention in DataFusion: the planner always tries to + // swap the smaller table to the left side. + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc, /// join filter - filter: Option, + pub(crate) join_filter: Option, /// type of the join - join_type: JoinType, - /// the outer table data of the nested loop join - outer_table: SendableRecordBatchStream, - /// the inner table data of the nested loop join - inner_table: OnceFut, - /// Information of index and left / right placement of columns - column_indices: Vec, - // TODO: support null aware equal - // null_equality: NullEquality, + pub(crate) join_type: JoinType, + /// the probe-side(right) table data of the nested loop join + pub(crate) right_data: SendableRecordBatchStream, + /// the build-side table data of the nested loop join + pub(crate) left_data: OnceFut, + /// Projection to construct the output schema from the left and right tables. + /// Example: + /// - output_schema: ['a', 'c'] + /// - left_schema: ['a', 'b'] + /// - right_schema: ['c'] + /// + /// The column indices would be [(left, 0), (right, 0)] -- taking the left + /// 0th column and right 0th column can construct the output schema. + /// + /// Note there are other columns ('b' in the example) still kept after + /// projection pushdown; this is because they might be used to evaluate + /// the join filter (e.g., `JOIN ON (b+c)>0`). + pub(crate) column_indices: Vec, /// Join execution metrics - join_metrics: BuildProbeJoinMetrics, - /// Cache for join indices calculations - indices_cache: (UInt64Array, UInt32Array), - /// Whether the right side is ordered - right_side_ordered: bool, - /// Current state of the stream - state: NestedLoopJoinStreamState, - /// Result of the left data future - left_data: Option>, - - /// Tracks progress when building join result batches incrementally. - join_result_status: Option, - - intermediate_batch_size: usize, + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + batch_size: usize, + + /// See comments in [`need_produce_right_in_final`] for more detail + should_track_unmatched_right: bool, + + // ======================================================================== + // STATE FLAGS/BUFFERS: + // Fields that hold intermediate data/flags during execution + // ======================================================================== + /// State Tracking + state: NLJState, + /// Output buffer holds the join result to output. It will emit eagerly when + /// the threshold is reached. + output_buffer: Box, + /// See comments in [`NLJState::Done`] for its purpose + handled_empty_output: bool, + + // Buffer(left) side + // ----------------- + /// The current buffered left data to join + buffered_left_data: Option>, + /// Index into the left buffered batch. Used in `ProbeRight` state + left_probe_idx: usize, + /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state + left_emit_idx: usize, + /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched` + /// state is over. + left_exhausted: bool, + /// If we can buffer all left data in one pass + /// TODO(now): this is for the (unimplemented) memory-limited execution + #[allow(dead_code)] + left_buffered_in_one_pass: bool, + + // Probe(right) side + // ----------------- + /// The current probe batch to process + current_right_batch: Option, + // For right join, keep track of matched rows in `current_right_batch` + // Constructed when fetching each new incoming right batch in `FetchingRight` state. + current_right_batch_matched: Option, } -/// Creates a Cartesian product of two input batches, preserving the order of the right batch, -/// and applying a join filter if provided. -/// -/// # Example -/// Input: -/// left = [0, 1], right = [0, 1, 2] -/// -/// Output: -/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2] -/// -/// Input: -/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a -/// -/// Output: -/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 2, 2, 3, 3, 3] -fn build_join_indices( - left_batch: &RecordBatch, - right_batch: &RecordBatch, - filter: Option<&JoinFilter>, - indices_cache: &mut (UInt64Array, UInt32Array), - max_intermediate_batch_size: usize, -) -> Result<(UInt64Array, UInt32Array)> { - let left_row_count = left_batch.num_rows(); - let right_row_count = right_batch.num_rows(); - let output_row_count = left_row_count * right_row_count; - - // We always use the same indices before applying the filter, so we can cache them - let (left_indices_cache, right_indices_cache) = indices_cache; - let cached_output_row_count = left_indices_cache.len(); - - let (left_indices, right_indices) = - match output_row_count.cmp(&cached_output_row_count) { - std::cmp::Ordering::Equal => { - // Reuse the cached indices - (left_indices_cache.clone(), right_indices_cache.clone()) - } - std::cmp::Ordering::Less => { - // Left_row_count never changes because it's the build side. The changes to the - // right_row_count can be handled trivially by taking the first output_row_count - // elements of the cache because of how the indices are generated. - // (See the Ordering::Greater match arm) - ( - left_indices_cache.slice(0, output_row_count), - right_indices_cache.slice(0, output_row_count), - ) - } - std::cmp::Ordering::Greater => { - // Rebuild the indices cache +impl Stream for NestedLoopJoinStream { + type Item = Result; - // Produces 0, 1, 2, 0, 1, 2, 0, 1, 2, ... - *left_indices_cache = UInt64Array::from_iter_values( - (0..output_row_count as u64).map(|i| i % left_row_count as u64), - ); + /// See the comments [`NestedLoopJoinExec`] for high-level design ideas. + /// + /// # Implementation + /// + /// This function is the entry point of NLJ operator's state machine + /// transitions. The rough state transition graph is as follow, for more + /// details see the comment in each state's matching arm. + /// + /// ============================ + /// State transition graph: + /// ============================ + /// + /// (start) --> BufferingLeft + /// ---------------------------- + /// BufferingLeft → FetchingRight + /// + /// FetchingRight → ProbeRight (if right batch available) + /// FetchingRight → EmitLeftUnmatched (if right exhausted) + /// + /// ProbeRight → ProbeRight (next left row or after yielding output) + /// ProbeRight → EmitRightUnmatched (for special join types like right join) + /// ProbeRight → FetchingRight (done with the current right batch) + /// + /// EmitRightUnmatched → FetchingRight + /// + /// EmitLeftUnmatched → EmitLeftUnmatched (only process 1 chunk for each + /// iteration) + /// EmitLeftUnmatched → Done (if finished) + /// ---------------------------- + /// Done → (end) + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + match self.state { + // # NLJState transitions + // --> FetchingRight + // This state will prepare the left side batches, next state + // `FetchingRight` is responsible for preparing a single probe + // side batch, before start joining. + NLJState::BufferingLeft => { + debug!("[NLJState] Entering: {:?}", self.state); + // inside `collect_left_input` (the rountine to buffer build + // -side batches), related metrics except build time will be + // updated. + // stop on drop + let build_metric = self.join_metrics.build_time.clone(); + let _build_timer = build_metric.timer(); + + match self.handle_buffering_left(cx) { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(poll) => return poll, + } + } - // Produces 0, 0, 0, 1, 1, 1, 2, 2, 2, ... - *right_indices_cache = UInt32Array::from_iter_values( - (0..output_row_count as u32).map(|i| i / left_row_count as u32), - ); + // # NLJState transitions: + // 1. --> ProbeRight + // Start processing the join for the newly fetched right + // batch. + // 2. --> EmitLeftUnmatched: When the right side input is exhausted, (maybe) emit + // unmatched left side rows. + // + // After fetching a new batch from the right side, it will + // process all rows from the buffered left data: + // ```text + // for batch in right_side: + // for row in left_buffer: + // join(batch, row) + // ``` + // Note: the implementation does this step incrementally, + // instead of materializing all intermediate Cartesian products + // at once in memory. + // + // So after the right side input is exhausted, the join phase + // for the current buffered left data is finished. We can go to + // the next `EmitLeftUnmatched` phase to check if there is any + // special handling (e.g., in cases like left join). + NLJState::FetchingRight => { + debug!("[NLJState] Entering: {:?}", self.state); + // stop on drop + let join_metric = self.join_metrics.join_time.clone(); + let _join_timer = join_metric.timer(); + + match self.handle_fetching_right(cx) { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(poll) => return poll, + } + } + + // NLJState transitions: + // 1. --> ProbeRight(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> ProbeRight(2) + // After probing one right batch, and evaluating the + // join filter on (left-row x right-batch), it will advance + // to the next left row, then re-enter the current state and + // continue joining. + // 3. --> FetchRight + // After it has done with the current right batch (to join + // with all rows in the left buffer), it will go to + // FetchRight state to check what to do next. + NLJState::ProbeRight => { + debug!("[NLJState] Entering: {:?}", self.state); + + // stop on drop + let join_metric = self.join_metrics.join_time.clone(); + let _join_timer = join_metric.timer(); + + match self.handle_probe_right() { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(poll) => { + return self.join_metrics.baseline.record_poll(poll) + } + } + } - (left_indices_cache.clone(), right_indices_cache.clone()) + // In the `current_right_batch_matched` bitmap, all trues mean + // it has been output by the join. In this state we have to + // output unmatched rows for current right batch (with null + // padding for left relation) + // Precondition: we have checked the join type so that it's + // possible to output right unmatched (e.g. it's right join) + NLJState::EmitRightUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + + // stop on drop + let join_metric = self.join_metrics.join_time.clone(); + let _join_timer = join_metric.timer(); + + match self.handle_emit_right_unmatched() { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(poll) => { + return self.join_metrics.baseline.record_poll(poll) + } + } + } + + // NLJState transitions: + // 1. --> EmitLeftUnmatched(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> EmitLeftUnmatched(2) + // After processing some unmatched rows, it will re-enter + // the same state, to check if there are any more final + // results to output. + // 3. --> Done + // It has processed all data, go to the final state and ready + // to exit. + // + // TODO: For memory-limited case, go back to `BufferingLeft` + // state again. + NLJState::EmitLeftUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + + // stop on drop + let join_metric = self.join_metrics.join_time.clone(); + let _join_timer = join_metric.timer(); + + match self.handle_emit_left_unmatched() { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(poll) => { + return self.join_metrics.baseline.record_poll(poll) + } + } + } + + // The final state and the exit point + NLJState::Done => { + debug!("[NLJState] Entering: {:?}", self.state); + + // stop on drop + let join_metric = self.join_metrics.join_time.clone(); + let _join_timer = join_metric.timer(); + // counting it in join timer due to there might be some + // final resout batches to output in this state + + let poll = self.handle_done(); + return self.join_metrics.baseline.record_poll(poll); + } } - }; + } + } +} - if let Some(filter) = filter { - apply_join_filter_to_indices( - left_batch, - right_batch, - left_indices, - right_indices, - filter, - JoinSide::Left, - Some(max_intermediate_batch_size), - ) - } else { - Ok((left_indices, right_indices)) +impl RecordBatchStream for NestedLoopJoinStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.output_schema) } } impl NestedLoopJoinStream { - fn poll_next_impl( + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + schema: Arc, + filter: Option, + join_type: JoinType, + right_data: SendableRecordBatchStream, + left_data: OnceFut, + column_indices: Vec, + join_metrics: BuildProbeJoinMetrics, + batch_size: usize, + ) -> Self { + Self { + output_schema: Arc::clone(&schema), + join_filter: filter, + join_type, + right_data, + column_indices, + left_data, + join_metrics, + buffered_left_data: None, + output_buffer: Box::new(BatchCoalescer::new(schema, batch_size)), + batch_size, + current_right_batch: None, + current_right_batch_matched: None, + state: NLJState::BufferingLeft, + left_probe_idx: 0, + left_emit_idx: 0, + left_exhausted: false, + left_buffered_in_one_pass: true, + handled_empty_output: false, + should_track_unmatched_right: need_produce_right_in_final(join_type), + } + } + + // ==== State handler functions ==== + + /// Handle BufferingLeft state - prepare left side batches + fn handle_buffering_left( &mut self, cx: &mut std::task::Context<'_>, - ) -> Poll>> { - loop { - return match self.state { - NestedLoopJoinStreamState::WaitBuildSide => { - handle_state!(ready!(self.collect_build_side(cx))) - } - NestedLoopJoinStreamState::FetchProbeBatch => { - handle_state!(ready!(self.fetch_probe_batch(cx))) - } - NestedLoopJoinStreamState::ProcessProbeBatch(_) => { - let poll = handle_state!(self.process_probe_batch()); - self.join_metrics.baseline.record_poll(poll) + ) -> ControlFlow>>> { + match self.left_data.get_shared(cx) { + Poll::Ready(Ok(left_data)) => { + self.buffered_left_data = Some(left_data); + // TODO: implement memory-limited case + self.left_exhausted = true; + self.state = NLJState::FetchingRight; + // Continue to next state immediately + ControlFlow::Continue(()) + } + Poll::Ready(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + Poll::Pending => ControlFlow::Break(Poll::Pending), + } + } + + /// Handle FetchingRight state - fetch next right batch and prepare for processing + fn handle_fetching_right( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> ControlFlow>>> { + match self.right_data.poll_next_unpin(cx) { + Poll::Ready(result) => match result { + Some(Ok(right_batch)) => { + // Update metrics + let right_batch_size = right_batch.num_rows(); + self.join_metrics.input_rows.add(right_batch_size); + self.join_metrics.input_batches.add(1); + + // Skip the empty batch + if right_batch_size == 0 { + return ControlFlow::Continue(()); + } + + self.current_right_batch = Some(right_batch); + + // Prepare right bitmap + if self.should_track_unmatched_right { + let zeroed_buf = BooleanBuffer::new_unset(right_batch_size); + self.current_right_batch_matched = + Some(BooleanArray::new(zeroed_buf, None)); + } + + self.left_probe_idx = 0; + self.state = NLJState::ProbeRight; + ControlFlow::Continue(()) } - NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => { - handle_state!(self.prepare_unmatched_output_indices()) + Some(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + None => { + // Right stream exhausted + self.state = NLJState::EmitLeftUnmatched; + ControlFlow::Continue(()) } - NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { - let poll = handle_state!(self.build_unmatched_output()); - self.join_metrics.baseline.record_poll(poll) + }, + Poll::Pending => ControlFlow::Break(Poll::Pending), + } + } + + /// Handle ProbeRight state - process current probe batch + fn handle_probe_right(&mut self) -> ControlFlow>>> { + // Return any completed batches first + if let Some(poll) = self.maybe_flush_ready_batch() { + return ControlFlow::Break(poll); + } + + // Process current probe state + match self.process_probe_batch() { + // State unchanged (ProbeRight) + // Continue probing until we have done joining the + // current right batch with all buffered left rows. + Ok(true) => ControlFlow::Continue(()), + // To next FetchRightState + // We have finished joining + // (cur_right_batch x buffered_left_batches) + Ok(false) => { + // Left exhausted, transition to FetchingRight + self.left_probe_idx = 0; + if self.should_track_unmatched_right { + debug_assert!( + self.current_right_batch_matched.is_some(), + "If it's required to track matched rows in the right input, the right bitmap must be present" + ); + self.state = NLJState::EmitRightUnmatched; + } else { + self.current_right_batch = None; + self.state = NLJState::FetchingRight; } - NestedLoopJoinStreamState::Completed => Poll::Ready(None), - }; + ControlFlow::Continue(()) + } + Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), } } - // This function's main job is to construct an output `RecordBatch` based on pre-calculated join indices. - // It operates in a chunk-based manner, meaning it processes a portion of the results in each call, - // making it suitable for streaming large datasets without high memory consumption. - // This function behaves like an iterator. It returns `Ok(None)` - // to signal that the result stream is exhausted and there is no more data. - fn get_next_join_result(&mut self) -> Result> { - let status = self.join_result_status.as_mut().ok_or_else(|| { - internal_datafusion_err!( - "get_next_join_result called without initializing join_result_status" - ) - })?; + /// Handle EmitRightUnmatched state - emit unmatched right rows + fn handle_emit_right_unmatched( + &mut self, + ) -> ControlFlow>>> { + // Return any completed batches first + if let Some(poll) = self.maybe_flush_ready_batch() { + return ControlFlow::Break(poll); + } - let (left_indices, right_indices, current_start) = ( - &status.build_indices, - &status.probe_indices, - status.processed_count, + debug_assert!( + self.current_right_batch_matched.is_some() + && self.current_right_batch.is_some(), + "This state is yielding output for unmatched rows in the current right batch, so both the right batch and the bitmap must be present" ); - let left_batch = self - .left_data - .as_ref() - .ok_or_else(|| internal_datafusion_err!("should have left_batch"))? - .batch(); - - let right_batch = match &self.state { - NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) - | NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { - record_batch + // Construct the result batch for unmatched right rows using a utility function + match self.process_right_unmatched() { + Ok(Some(batch)) => { + match self.output_buffer.push_batch(batch) { + Ok(()) => { + // Processed all in one pass + // cleared inside `process_right_unmatched` + debug_assert!(self.current_right_batch.is_none()); + self.state = NLJState::FetchingRight; + ControlFlow::Continue(()) + } + Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))), + } } - _ => { - return internal_err!( - "State should be ProcessProbeBatch or OutputUnmatchedBuildRows" - ) + Ok(None) => { + // Processed all in one pass + // cleared inside `process_right_unmatched` + debug_assert!(self.current_right_batch.is_none()); + self.state = NLJState::FetchingRight; + ControlFlow::Continue(()) } - }; + Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + } + } - if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { - // To match the behavior of the previous implementation, return an empty RecordBatch. - let res = RecordBatch::new_empty(Arc::clone(&self.schema)); - status.processed_count = 1; - return Ok(Some(res)); + /// Handle EmitLeftUnmatched state - emit unmatched left rows + fn handle_emit_left_unmatched( + &mut self, + ) -> ControlFlow>>> { + // Return any completed batches first + if let Some(poll) = self.maybe_flush_ready_batch() { + return ControlFlow::Break(poll); } - if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) { - // in this case left_indices.num_rows() == 0 - let end = min( - current_start + self.intermediate_batch_size, - right_indices.len(), - ); + // Process current unmatched state + match self.process_left_unmatched() { + // State unchanged (EmitLeftUnmatched) + // Continue processing until we have processed all unmatched rows + Ok(true) => ControlFlow::Continue(()), + // To Done state + // We have finished processing all unmatched rows + Ok(false) => match self.output_buffer.finish_buffered_batch() { + Ok(()) => { + self.state = NLJState::Done; + ControlFlow::Continue(()) + } + Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))), + }, + Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + } + } - if current_start >= end { - return Ok(None); + /// Handle Done state - final state processing + fn handle_done(&mut self) -> Poll>> { + // Return any remaining completed batches before final termination + if let Some(poll) = self.maybe_flush_ready_batch() { + return poll; + } + + // HACK for the doc test in https://github.com/apache/datafusion/blob/main/datafusion/core/src/dataframe/mod.rs#L1265 + // If this operator directly return `Poll::Ready(None)` + // for empty result, the final result will become an empty + // batch with empty schema, however the expected result + // should be with the expected schema for this operator + if !self.handled_empty_output { + let zero_count = Count::new(); + if *self.join_metrics.baseline.output_rows() == zero_count { + let empty_batch = RecordBatch::new_empty(Arc::clone(&self.output_schema)); + self.handled_empty_output = true; + return Poll::Ready(Some(Ok(empty_batch))); } + } - let res = Some(build_batch_from_indices( - &self.schema, - left_batch, - right_batch, - left_indices, - &right_indices.slice(current_start, end - current_start), - &self.column_indices, - JoinSide::Left, - )?); + Poll::Ready(None) + } + + // ==== Core logic handling for each state ==== + + /// Returns bool to indicate should it continue probing + /// true -> continue in the same ProbeRight state + /// false -> It has done with the (buffered_left x cur_right_batch), go to + /// next state (ProbeRight) + fn process_probe_batch(&mut self) -> Result { + let left_data = Arc::clone(self.get_left_data()?); + let right_batch = self + .current_right_batch + .as_ref() + .ok_or_else(|| internal_datafusion_err!("Right batch should be available"))? + .clone(); - status.processed_count = end; - return Ok(res); + // stop probing, the caller will go to the next state + if self.left_probe_idx >= left_data.batch().num_rows() { + return Ok(false); } - if current_start >= left_indices.len() { - return Ok(None); + // ======== + // Join (l_row x right_batch) + // and push the result into output_buffer + // ======== + + let l_idx = self.left_probe_idx; + let join_batch = + self.process_single_left_row_join(&left_data, &right_batch, l_idx)?; + + if let Some(batch) = join_batch { + self.output_buffer.push_batch(batch)?; } - let end = min( - current_start + self.intermediate_batch_size, - left_indices.len(), - ); + // ==== Prepare for the next iteration ==== + + // Advance left cursor + self.left_probe_idx += 1; + + // Return true to continue probing + Ok(true) + } - let left_indices = &left_indices.slice(current_start, end - current_start); - let right_indices = &right_indices.slice(current_start, end - current_start); + /// Process a single left row join with the current right batch. + /// Returns a RecordBatch containing the join results (None if empty) + fn process_single_left_row_join( + &mut self, + left_data: &JoinLeftData, + right_batch: &RecordBatch, + l_index: usize, + ) -> Result> { + let right_row_count = right_batch.num_rows(); + if right_row_count == 0 { + return Ok(None); + } - // Switch around the build side and probe side for `JoinType::RightMark` - // because in a RightMark join, we want to mark rows on the right table - // by looking for matches in the left. - let res = if self.join_type == JoinType::RightMark { - build_batch_from_indices( - &self.schema, + let cur_right_bitmap = if let Some(filter) = &self.join_filter { + apply_filter_to_row_join_batch( + left_data.batch(), + l_index, right_batch, - left_batch, - left_indices, - right_indices, - &self.column_indices, - JoinSide::Right, - ) + filter, + )? } else { - build_batch_from_indices( - &self.schema, - left_batch, + BooleanArray::from(vec![true; right_row_count]) + }; + + self.update_matched_bitmap(l_index, &cur_right_bitmap)?; + + // For the following join types: here we only have to set the left/right + // bitmap, and no need to output result + if matches!( + self.join_type, + JoinType::LeftAnti + | JoinType::LeftSemi + | JoinType::LeftMark + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ) { + return Ok(None); + } + + if cur_right_bitmap.true_count() == 0 { + // If none of the pairs has passed the join predicate/filter + Ok(None) + } else { + // Use the optimized approach similar to build_intermediate_batch_for_single_left_row + let join_batch = build_row_join_batch( + &self.output_schema, + left_data.batch(), + l_index, right_batch, - left_indices, - right_indices, + Some(cur_right_bitmap), &self.column_indices, JoinSide::Left, - ) - }?; + )?; + Ok(join_batch) + } + } - status.processed_count = end; + /// Returns bool to indicate should it continue processing unmatched rows + /// true -> continue in the same EmitLeftUnmatched state + /// false -> next state (Done) + fn process_left_unmatched(&mut self) -> Result { + let left_data = self.get_left_data()?; + let left_batch = left_data.batch(); + + // ======== + // Check early return conditions + // ======== + + // Early return if join type can't have unmatched rows + let join_type_no_produce_left = !need_produce_result_in_final(self.join_type); + // Early return if another thread is already processing unmatched rows + let handled_by_other_partition = + self.left_emit_idx == 0 && !left_data.report_probe_completed(); + // Stop processing unmatched rows, the caller will go to the next state + let finished = self.left_emit_idx >= left_batch.num_rows(); + + if join_type_no_produce_left || handled_by_other_partition || finished { + return Ok(false); + } - Ok(Some(res)) - } + // ======== + // Process unmatched rows and push the result into output_buffer + // Each time, the number to process is up to batch size + // ======== + let start_idx = self.left_emit_idx; + let end_idx = std::cmp::min(start_idx + self.batch_size, left_batch.num_rows()); - fn collect_build_side( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>>> { - let build_timer = self.join_metrics.build_time.timer(); - // build hash table from left (build) side, if not yet done - self.left_data = Some(ready!(self.inner_table.get_shared(cx))?); - build_timer.done(); + if let Some(batch) = + self.process_left_unmatched_range(left_data, start_idx, end_idx)? + { + self.output_buffer.push_batch(batch)?; + } - self.state = NestedLoopJoinStreamState::FetchProbeBatch; + // ==== Prepare for the next iteration ==== + self.left_emit_idx = end_idx; - Poll::Ready(Ok(StatefulStreamResult::Continue)) + // Return true to continue processing unmatched rows + Ok(true) } - /// Fetches next batch from probe-side + /// Process unmatched rows from the left data within the specified range. + /// Returns a RecordBatch containing the unmatched rows (None if empty). /// - /// If a non-empty batch has been fetched, updates state to - /// `ProcessProbeBatchState`, otherwise updates state to `ExhaustedProbeSide`. - fn fetch_probe_batch( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>>> { - match ready!(self.outer_table.poll_next_unpin(cx)) { - None => { - self.state = NestedLoopJoinStreamState::PrepareUnmatchedBuildRows; - } - Some(Ok(right_batch)) => { - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(right_batch.num_rows()); - - self.state = NestedLoopJoinStreamState::ProcessProbeBatch(right_batch); - } - Some(Err(err)) => return Poll::Ready(Err(err)), - }; + /// # Arguments + /// * `left_data` - The left side data containing the batch and bitmap + /// * `start_idx` - Start index (inclusive) of the range to process + /// * `end_idx` - End index (exclusive) of the range to process + /// + /// # Safety + /// The caller is responsible for ensuring that `start_idx` and `end_idx` are + /// within valid bounds of the left batch. This function does not perform + /// bounds checking. + fn process_left_unmatched_range( + &self, + left_data: &JoinLeftData, + start_idx: usize, + end_idx: usize, + ) -> Result> { + if start_idx == end_idx { + return Ok(None); + } - Poll::Ready(Ok(StatefulStreamResult::Continue)) + // Slice both left batch, and bitmap to range [start_idx, end_idx) + // The range is bit index (not byte) + let left_batch = left_data.batch(); + let left_batch_sliced = left_batch.slice(start_idx, end_idx - start_idx); + + // Can this be more efficient? + let mut bitmap_sliced = BooleanBufferBuilder::new(end_idx - start_idx); + bitmap_sliced.append_n(end_idx - start_idx, false); + let bitmap = left_data.bitmap().lock(); + for i in start_idx..end_idx { + assert!( + i - start_idx < bitmap_sliced.capacity(), + "DBG: {start_idx}, {end_idx}" + ); + bitmap_sliced.set_bit(i - start_idx, bitmap.get_bit(i)); + } + let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None); + + build_unmatched_batch( + Arc::clone(&self.output_schema), + &left_batch_sliced, + bitmap_sliced, + self.right_data.schema(), + &self.column_indices, + self.join_type, + JoinSide::Left, + ) } - /// Joins current probe batch with build-side data and produces batch with - /// matched output, updates state to `FetchProbeBatch`. - fn process_probe_batch( - &mut self, - ) -> Result>> { - let Some(left_data) = self.left_data.clone() else { - return internal_err!( - "Expected left_data to be Some in ProcessProbeBatch state" - ); - }; - let visited_left_side = left_data.bitmap(); - let batch = self.state.try_as_process_probe_batch()?; + /// Process unmatched rows from the current right batch and reset the bitmap. + /// Returns a RecordBatch containing the unmatched right rows (None if empty). + fn process_right_unmatched(&mut self) -> Result> { + // ==== Take current right batch and its bitmap ==== + let right_batch_bitmap: BooleanArray = + std::mem::take(&mut self.current_right_batch_matched).ok_or_else(|| { + internal_datafusion_err!("right bitmap should be available") + })?; + + let right_batch = self.current_right_batch.take(); + let cur_right_batch = unwrap_or_internal_err!(right_batch); + + let left_data = self.get_left_data()?; + let left_schema = left_data.batch().schema(); + + let res = build_unmatched_batch( + Arc::clone(&self.output_schema), + &cur_right_batch, + right_batch_bitmap, + left_schema, + &self.column_indices, + self.join_type, + JoinSide::Right, + ); - let binding = self.join_metrics.join_time.clone(); - let _timer = binding.timer(); + // ==== Clean-up ==== + self.current_right_batch_matched = None; - if self.join_result_status.is_none() { - let (left_side_indices, right_side_indices) = join_left_and_right_batch( - left_data.batch(), - batch, - self.join_type, - self.filter.as_ref(), - visited_left_side, - &mut self.indices_cache, - self.right_side_ordered, - self.intermediate_batch_size, - )?; - self.join_result_status = Some(JoinResultProgress { - build_indices: left_side_indices, - probe_indices: right_side_indices, - processed_count: 0, - }) - } + res + } - let join_result = self.get_next_join_result()?; + // ==== Utilities ==== - match join_result { - Some(res) => { - self.join_metrics.output_batches.add(1); - Ok(StatefulStreamResult::Ready(Some(res))) - } - None => { - self.state = NestedLoopJoinStreamState::FetchProbeBatch; - self.join_result_status = None; - Ok(StatefulStreamResult::Continue) - } - } + /// Get the build-side data of the left input, errors if it's None + fn get_left_data(&self) -> Result<&Arc> { + self.buffered_left_data + .as_ref() + .ok_or_else(|| internal_datafusion_err!("LeftData should be available")) } - fn build_unmatched_output( - &mut self, - ) -> Result>> { - let binding = self.join_metrics.join_time.clone(); - let _timer = binding.timer(); - - let res = self.get_next_join_result()?; - match res { - Some(res) => { + /// Flush the `output_buffer` if there are batches ready to output + /// None if no result batch ready. + fn maybe_flush_ready_batch(&mut self) -> Option>>> { + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + // HACK: this is not part of `BaselineMetrics` yet, so update it + // manually self.join_metrics.output_batches.add(1); - Ok(StatefulStreamResult::Ready(Some(res))) - } - None => { - self.state = NestedLoopJoinStreamState::Completed; - Ok(StatefulStreamResult::Ready(None)) + + return Some(Poll::Ready(Some(Ok(batch)))); } } + + None } - /// This function's primary purpose is to handle the final output stage required by specific join types after all right-side (probe) data has been exhausted. - /// It is critically important for LEFT*/FULL joins, which must emit left-side (build) rows that found no match. For these cases, it identifies the unmatched rows and prepares the necessary state to output them. - fn prepare_unmatched_output_indices( + /// After joining (l_index@left_buffer x current_right_batch), it will result + /// in a bitmap (the same length as current_right_batch) as the join match + /// result. Use this bitmap to update the global bitmap, for special join + /// types like full joins. + /// + /// Example: + /// After joining l_index=1 (1-indexed row in the left buffer), and the + /// current right batch with 3 elements, this function will be called with + /// arguments: l_index = 1, r_matched = [false, false, true] + /// - If the join type is FullJoin, the 1-index in the left bitmap will be + /// set to true, and also the right bitmap will be bitwise-ORed with the + /// input r_matched bitmap. + /// - For join types that don't require output unmatched rows, this + /// function can be a no-op. For inner joins, this function is a no-op; for left + /// joins, only the left bitmap may be updated. + fn update_matched_bitmap( &mut self, - ) -> Result>> { - let Some(left_data) = self.left_data.clone() else { - return internal_err!( - "Expected left_data to be Some in ExhaustedProbeSide state" - ); - }; - let visited_left_side = left_data.bitmap(); - if need_produce_result_in_final(self.join_type) { - // At this stage `visited_left_side` won't be updated, so it's - // safe to report about probe completion. - // - // Setting `is_exhausted` / returning None will prevent from - // multiple calls of `report_probe_completed()` - if !left_data.report_probe_completed() { - self.state = NestedLoopJoinStreamState::Completed; - return Ok(StatefulStreamResult::Ready(None)); - }; + l_index: usize, + r_matched_bitmap: &BooleanArray, + ) -> Result<()> { + let left_data = self.get_left_data()?; - // Only setting up timer, input is exhausted - let _timer = self.join_metrics.join_time.timer(); - // use the global left bitmap to produce the left indices and right indices - let (left_side, right_side) = - get_final_indices_from_shared_bitmap(visited_left_side, self.join_type); - - self.join_result_status = Some(JoinResultProgress { - build_indices: left_side, - probe_indices: right_side, - processed_count: 0, - }); - self.state = NestedLoopJoinStreamState::OutputUnmatchedBuildRows( - RecordBatch::new_empty(self.outer_table.schema()), - ); + // number of successfully joined pairs from (l_index x cur_right_batch) + let joined_len = r_matched_bitmap.true_count(); - Ok(StatefulStreamResult::Continue) - } else { - // end of the join loop - self.state = NestedLoopJoinStreamState::Completed; - Ok(StatefulStreamResult::Ready(None)) + // 1. Maybe update the left bitmap + if need_produce_result_in_final(self.join_type) && (joined_len > 0) { + let mut bitmap = left_data.bitmap().lock(); + bitmap.set_bit(l_index, true); + } + + // 2. Maybe updateh the right bitmap + if self.should_track_unmatched_right { + debug_assert!(self.current_right_batch_matched.is_some()); + // after bit-wise or, it will be put back + let right_bitmap = std::mem::take(&mut self.current_right_batch_matched) + .ok_or_else(|| { + internal_datafusion_err!("right batch's bitmap should be present") + })?; + let (buf, nulls) = right_bitmap.into_parts(); + debug_assert!(nulls.is_none()); + let updated_right_bitmap = buf.bitor(r_matched_bitmap.values()); + + self.current_right_batch_matched = + Some(BooleanArray::new(updated_right_bitmap, None)); } + + Ok(()) } } -#[allow(clippy::too_many_arguments)] -fn join_left_and_right_batch( +// ==== Utilities ==== + +/// Apply the join filter between: +/// (l_index th row in left buffer) x (right batch) +/// Returns a bitmap, with successfully joined indices set to true +fn apply_filter_to_row_join_batch( left_batch: &RecordBatch, + l_index: usize, right_batch: &RecordBatch, - join_type: JoinType, - filter: Option<&JoinFilter>, - visited_left_side: &SharedBitmapBuilder, - indices_cache: &mut (UInt64Array, UInt32Array), - right_side_ordered: bool, - max_intermediate_batch_size: usize, -) -> Result<(PrimitiveArray, PrimitiveArray)> { - let (left_side, right_side) = build_join_indices( - left_batch, - right_batch, - filter, - indices_cache, - max_intermediate_batch_size, - ) - .map_err(|e| { - exec_datafusion_err!( - "Fail to build join indices in NestedLoopJoinExec, error: {e}" - ) - })?; - - // set the left bitmap - // and only full join need the left bitmap - if need_produce_result_in_final(join_type) { - let mut bitmap = visited_left_side.lock(); - left_side.values().iter().for_each(|x| { - bitmap.set_bit(*x as usize, true); - }); - } - // adjust the two side indices base on the join type - let (left_side, right_side) = adjust_indices_by_join_type( - left_side, - right_side, - 0..right_batch.num_rows(), - join_type, - right_side_ordered, - )?; + filter: &JoinFilter, +) -> Result { + debug_assert!(left_batch.num_rows() != 0 && right_batch.num_rows() != 0); + + let intermediate_batch = if filter.schema.fields().is_empty() { + // If filter is constant (e.g. literal `true`), empty batch can be used + // in the later filter step. + create_record_batch_with_empty_schema( + Arc::new((*filter.schema).clone()), + right_batch.num_rows(), + )? + } else { + build_row_join_batch( + &filter.schema, + left_batch, + l_index, + right_batch, + None, + &filter.column_indices, + JoinSide::Left, + )? + .ok_or_else(|| internal_datafusion_err!("This function assume input batch is not empty, so the intermediate batch can't be empty too"))? + }; + + let filter_result = filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())?; + let filter_arr = as_boolean_array(&filter_result)?; + + // [Caution] This step has previously introduced bugs + // The filter result is NOT a bitmap; it contains true/false/null values. + // For example, 1 < NULL is evaluated to NULL. Therefore, we must combine (AND) + // the boolean array with its null bitmap to construct a unified bitmap. + let (is_filtered, nulls) = filter_arr.clone().into_parts(); + let bitmap_combined = match nulls { + Some(nulls) => { + let combined = nulls.inner() & &is_filtered; + BooleanArray::new(combined, None) + } + None => BooleanArray::new(is_filtered, None), + }; - Ok((left_side, right_side)) + Ok(bitmap_combined) } -impl Stream for NestedLoopJoinStream { - type Item = Result; +/// This function performs the following steps: +/// 1. Apply filter to probe-side batch +/// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the +/// filtered probe-side batch +/// 3. Concat them together according to `col_indices`, and return the result +/// (None if the result is empty) +/// +/// Example: +/// build_side_batch: +/// a +/// ---- +/// 1 +/// 2 +/// 3 +/// +/// # 0 index element in the build_side_batch (that is `1`) will be used +/// build_side_index: 0 +/// +/// probe_side_batch: +/// b +/// ---- +/// 10 +/// 20 +/// 30 +/// 40 +/// +/// # After applying it, only index 1 and 3 elemnt in probe_side_batch will be +/// # kept +/// probe_side_filter: +/// false +/// true +/// false +/// true +/// +/// +/// # Projections to the build/probe side batch, to construct the output batch +/// col_indices: +/// [(left, 0), (right, 0)] +/// +/// build_side: left +/// +/// ==== +/// Result batch: +/// a b +/// ---- +/// 1 20 +/// 1 40 +fn build_row_join_batch( + output_schema: &Schema, + build_side_batch: &RecordBatch, + build_side_index: usize, + probe_side_batch: &RecordBatch, + probe_side_filter: Option, + // See [`NLJStream`] struct's `column_indices` field for more detail + col_indices: &[ColumnIndex], + // If the build side is left or right, used to interpret the side information + // in `col_indices` + build_side: JoinSide, +) -> Result> { + debug_assert!(build_side != JoinSide::None); + + // TODO(perf): since the output might be projection of right batch, this + // filtering step is more efficient to be done inside the column_index loop + let filtered_probe_batch = if let Some(filter) = probe_side_filter { + &filter_record_batch(probe_side_batch, &filter)? + } else { + probe_side_batch + }; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.poll_next_impl(cx) + if filtered_probe_batch.num_rows() == 0 { + return Ok(None); + } + + // Edge case: downstream operator does not require any columns from this NLJ, + // so allow an empty projection. + // Example: + // SELECT DISTINCT 32 AS col2 + // FROM tab0 AS cor0 + // LEFT OUTER JOIN tab2 AS cor1 + // ON ( NULL ) IS NULL; + if output_schema.fields.is_empty() { + return Ok(Some(create_record_batch_with_empty_schema( + Arc::new(output_schema.clone()), + filtered_probe_batch.num_rows(), + )?)); } + + let mut columns: Vec> = + Vec::with_capacity(output_schema.fields().len()); + + for column_index in col_indices { + let array = if column_index.side == build_side { + // Broadcast the single build-side row to match the filtered + // probe-side batch length + let original_left_array = build_side_batch.column(column_index.index); + let scalar_value = ScalarValue::try_from_array( + original_left_array.as_ref(), + build_side_index, + )?; + scalar_value.to_array_of_size(filtered_probe_batch.num_rows())? + } else { + // Take the filtered probe-side column using compute::take + Arc::clone(filtered_probe_batch.column(column_index.index)) + }; + + columns.push(array); + } + + Ok(Some(RecordBatch::try_new( + Arc::new(output_schema.clone()), + columns, + )?)) } -impl RecordBatchStream for NestedLoopJoinStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) +/// Special case for `PlaceHolderRowExec` +/// Minimal example: SELECT 1 WHERE EXISTS (SELECT 1); +// +/// # Return +/// If Some, that's the result batch +/// If None, it's not for this special case. Continue execution. +fn build_unmatched_batch_empty_schema( + output_schema: SchemaRef, + batch_bitmap: &BooleanArray, + // For left/right/full joins, it needs to fill nulls for another side + join_type: JoinType, +) -> Result> { + let result_size = match join_type { + JoinType::Left + | JoinType::Right + | JoinType::Full + | JoinType::LeftAnti + | JoinType::RightAnti => batch_bitmap.false_count(), + JoinType::LeftSemi | JoinType::RightSemi => batch_bitmap.true_count(), + JoinType::LeftMark | JoinType::RightMark => batch_bitmap.len(), + _ => unreachable!(), + }; + + if output_schema.fields().is_empty() { + Ok(Some(create_record_batch_with_empty_schema( + Arc::clone(&output_schema), + result_size, + )?)) + } else { + Ok(None) } } -impl EmbeddedProjection for NestedLoopJoinExec { - fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) +/// Creates an empty RecordBatch with a specific row count. +/// This is useful for cases where we need a batch with the correct schema and row count +/// but no actual data columns (e.g., for constant filters). +fn create_record_batch_with_empty_schema( + schema: SchemaRef, + row_count: usize, +) -> Result { + let options = RecordBatchOptions::new() + .with_match_field_names(true) + .with_row_count(Some(row_count)); + + RecordBatch::try_new_with_options(schema, vec![], &options).map_err(|e| { + internal_datafusion_err!("Failed to create empty record batch: {}", e) + }) +} + +/// # Example: +/// batch: +/// a +/// ---- +/// 1 +/// 2 +/// 3 +/// +/// batch_bitmap: +/// ---- +/// false +/// true +/// false +/// +/// another_side_schema: +/// [(b, bool), (c, int32)] +/// +/// join_type: JoinType::Left +/// +/// col_indices: ...(please refer to the comment in `NLJStream::column_indices``) +/// +/// batch_side: right +/// +/// # Walkthrough: +/// +/// This executor is performing a right join, and the currently processed right +/// batch is as above. After joining it with all buffered left rows, the joined +/// entries are marked by the `batch_bitmap`. +/// This method will keep the unmatched indices on the batch side (right), and pad +/// the left side with nulls. The result would be: +/// +/// b c a +/// ------------------------ +/// Null(bool) Null(Int32) 1 +/// Null(bool) Null(Int32) 3 +fn build_unmatched_batch( + output_schema: SchemaRef, + batch: &RecordBatch, + batch_bitmap: BooleanArray, + // For left/right/full joins, it needs to fill nulls for another side + another_side_schema: SchemaRef, + col_indices: &[ColumnIndex], + join_type: JoinType, + batch_side: JoinSide, +) -> Result> { + // Should not call it for inner joins + debug_assert_ne!(join_type, JoinType::Inner); + debug_assert_ne!(batch_side, JoinSide::None); + + // Handle special case (see function comment) + if let Some(batch) = build_unmatched_batch_empty_schema( + Arc::clone(&output_schema), + &batch_bitmap, + join_type, + )? { + return Ok(Some(batch)); + } + + match join_type { + JoinType::Full | JoinType::Right | JoinType::Left => { + if join_type == JoinType::Right { + debug_assert_eq!(batch_side, JoinSide::Right); + } + if join_type == JoinType::Left { + debug_assert_eq!(batch_side, JoinSide::Left); + } + + // 1. Filter the batch with *flipped* bitmap + // 2. Fill left side with nulls + let flipped_bitmap = not(&batch_bitmap)?; + + // create a recordbatch, with left_schema, of only one row of all nulls + let left_null_columns: Vec> = another_side_schema + .fields() + .iter() + .map(|field| new_null_array(field.data_type(), 1)) + .collect(); + + // Hack: If the left schema is not nullable, the full join result + // might contain null, this is only a temporary batch to construct + // such full join result. + let nullable_left_schema = Arc::new(Schema::new( + another_side_schema + .fields() + .iter() + .map(|field| { + (**field).clone().with_nullable(true) + }) + .collect::>(), + )); + let left_null_batch = if nullable_left_schema.fields.is_empty() { + // Left input can be an empty relation, in this case left relation + // won't be used to construct the result batch (i.e. not in `col_indices`) + create_record_batch_with_empty_schema(nullable_left_schema, 0)? + } else { + RecordBatch::try_new(nullable_left_schema, left_null_columns)? + }; + + debug_assert_ne!(batch_side, JoinSide::None); + let opposite_side = batch_side.negate(); + + build_row_join_batch(&output_schema, &left_null_batch, 0, batch, Some(flipped_bitmap), col_indices, opposite_side) + + }, + JoinType::RightSemi | JoinType::RightAnti | JoinType::LeftSemi | JoinType::LeftAnti => { + if matches!(join_type, JoinType::RightSemi | JoinType::RightAnti) { + debug_assert_eq!(batch_side, JoinSide::Right); + } + if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) { + debug_assert_eq!(batch_side, JoinSide::Left); + } + + let bitmap = if matches!(join_type, JoinType::LeftSemi | JoinType::RightSemi) { + batch_bitmap.clone() + } else { + not(&batch_bitmap)? + }; + + if bitmap.true_count() == 0 { + return Ok(None); + } + + let mut columns: Vec> = + Vec::with_capacity(output_schema.fields().len()); + + for column_index in col_indices { + debug_assert!(column_index.side == batch_side); + + let col = batch.column(column_index.index); + let filtered_col = filter(col, &bitmap)?; + + columns.push(filtered_col); + } + + Ok(Some(RecordBatch::try_new(Arc::clone(&output_schema), columns)?)) + }, + JoinType::RightMark | JoinType::LeftMark => { + if join_type == JoinType::RightMark { + debug_assert_eq!(batch_side, JoinSide::Right); + } + if join_type == JoinType::LeftMark { + debug_assert_eq!(batch_side, JoinSide::Left); + } + + let mut columns: Vec> = + Vec::with_capacity(output_schema.fields().len()); + + // Hack to deal with the borrow checker + let mut right_batch_bitmap_opt = Some(batch_bitmap); + + for column_index in col_indices { + if column_index.side == batch_side { + let col = batch.column(column_index.index); + + columns.push(Arc::clone(col)); + } else if column_index.side == JoinSide::None { + let right_batch_bitmap = std::mem::take(&mut right_batch_bitmap_opt); + match right_batch_bitmap { + Some(right_batch_bitmap) => {columns.push(Arc::new(right_batch_bitmap))}, + None => unreachable!("Should only be one mark column"), + } + } else { + return internal_err!("Not possible to have this join side for RightMark join"); + } + } + + Ok(Some(RecordBatch::try_new(Arc::clone(&output_schema), columns)?)) + } + _ => internal_err!("If batch is at right side, this function must be handling Full/Right/RightSemi/RightAnti/RightMark joins"), } } @@ -1194,7 +1900,6 @@ pub(crate) mod tests { common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, }; - use arrow::array::Int32Array; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field}; use datafusion_common::test_util::batches_to_sort_string; @@ -1748,167 +2453,6 @@ pub(crate) mod tests { Ok(()) } - fn prepare_mod_join_filter() -> JoinFilter { - let column_indices = vec![ - ColumnIndex { - index: 1, - side: JoinSide::Left, - }, - ColumnIndex { - index: 1, - side: JoinSide::Right, - }, - ]; - let intermediate_schema = Schema::new(vec![ - Field::new("x", DataType::Int32, true), - Field::new("x", DataType::Int32, true), - ]); - - // left.b1 % 3 - let left_mod = Arc::new(BinaryExpr::new( - Arc::new(Column::new("x", 0)), - Operator::Modulo, - Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), - )) as Arc; - // left.b1 % 3 != 0 - let left_filter = Arc::new(BinaryExpr::new( - left_mod, - Operator::NotEq, - Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), - )) as Arc; - - // right.b2 % 5 - let right_mod = Arc::new(BinaryExpr::new( - Arc::new(Column::new("x", 1)), - Operator::Modulo, - Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), - )) as Arc; - // right.b2 % 5 != 0 - let right_filter = Arc::new(BinaryExpr::new( - right_mod, - Operator::NotEq, - Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), - )) as Arc; - // filter = left.b1 % 3 != 0 and right.b2 % 5 != 0 - let filter_expression = - Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter)) - as Arc; - - JoinFilter::new( - filter_expression, - column_indices, - Arc::new(intermediate_schema), - ) - } - - fn generate_columns(num_columns: usize, num_rows: usize) -> Vec> { - let column = (1..=num_rows).map(|x| x as i32).collect(); - vec![column; num_columns] - } - - #[rstest] - #[tokio::test] - async fn join_maintains_right_order( - #[values( - JoinType::Inner, - JoinType::Right, - JoinType::RightAnti, - JoinType::RightSemi - )] - join_type: JoinType, - #[values(1, 100, 1000)] left_batch_size: usize, - #[values(1, 100, 1000)] right_batch_size: usize, - #[values(1001, 10000)] batch_size: usize, - ) -> Result<()> { - let left_columns = generate_columns(3, 1000); - let left = build_table( - ("a1", &left_columns[0]), - ("b1", &left_columns[1]), - ("c1", &left_columns[2]), - Some(left_batch_size), - Vec::new(), - ); - - let right_columns = generate_columns(3, 1000); - let right = build_table( - ("a2", &right_columns[0]), - ("b2", &right_columns[1]), - ("c2", &right_columns[2]), - Some(right_batch_size), - vec!["a2", "b2", "c2"], - ); - - let filter = prepare_mod_join_filter(); - - let nested_loop_join = Arc::new(NestedLoopJoinExec::try_new( - left, - Arc::clone(&right), - Some(filter), - &join_type, - None, - )?) as Arc; - assert_eq!(nested_loop_join.maintains_input_order(), vec![false, true]); - - let right_column_indices = match join_type { - JoinType::Inner | JoinType::Right => vec![3, 4, 5], - JoinType::RightAnti | JoinType::RightSemi => vec![0, 1, 2], - _ => unreachable!(), - }; - - let right_ordering = right.output_ordering().unwrap(); - let join_ordering = nested_loop_join.output_ordering().unwrap(); - for (right, join) in right_ordering.iter().zip(join_ordering.iter()) { - let right_column = right.expr.as_any().downcast_ref::().unwrap(); - let join_column = join.expr.as_any().downcast_ref::().unwrap(); - assert_eq!(join_column.name(), join_column.name()); - assert_eq!( - right_column_indices[right_column.index()], - join_column.index() - ); - assert_eq!(right.options, join.options); - } - - let task_ctx = new_task_ctx(batch_size); - let batches = nested_loop_join - .execute(0, task_ctx)? - .try_collect::>() - .await?; - - // Make sure that the order of the right side is maintained - let mut prev_values = [i32::MIN, i32::MIN, i32::MIN]; - - for (batch_index, batch) in batches.iter().enumerate() { - let columns: Vec<_> = right_column_indices - .iter() - .map(|&i| { - batch - .column(i) - .as_any() - .downcast_ref::() - .unwrap() - }) - .collect(); - - for row in 0..batch.num_rows() { - let current_values = [ - columns[0].value(row), - columns[1].value(row), - columns[2].value(row), - ]; - assert!( - current_values - .into_iter() - .zip(prev_values) - .all(|(current, prev)| current >= prev), - "batch_index: {batch_index} row: {row} current: {current_values:?}, prev: {prev_values:?}" - ); - prev_values = current_values; - } - } - - Ok(()) - } - /// Returns the column names on the schema fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 65e7a6106f09..a7cd81a98fad 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -774,6 +774,23 @@ impl OnceFut { } } +/// Should we use a bitmap to track each incoming right batch's each row's +/// 'joined' status. +/// +/// For example in right joins, we have to use a bit map to track matched +/// right side rows, and later enter a `EmitRightUnmatched` stage to emit +/// unmatched right rows. +pub(crate) fn need_produce_right_in_final(join_type: JoinType) -> bool { + matches!( + join_type, + JoinType::Full + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ) +} + /// Some type `join_type` of join need to maintain the matched indices bit map for the left side, and /// use the bit map to generate the part of result of the join. /// diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index e74c96e9d442..ad21bdac6d2d 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4148,9 +4148,9 @@ logical_plan 03)----TableScan: left_table projection=[a, b, c] 04)----TableScan: right_table projection=[x, y, z] physical_plan -01)NestedLoopJoinExec: join_type=Inner, filter=a@0 < x@1 -02)--DataSourceExec: partitions=1, partition_sizes=[0] -03)--SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] +01)SortExec: expr=[x@3 ASC NULLS LAST], preserve_partitioning=[false] +02)--NestedLoopJoinExec: join_type=Inner, filter=a@0 < x@1 +03)----DataSourceExec: partitions=1, partition_sizes=[0] 04)----DataSourceExec: partitions=1, partition_sizes=[0] query TT