diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 8117e101ea99..82163da64af8 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -46,6 +46,7 @@ use datafusion_physical_expr::{ }; use bytes::{Buf, Bytes}; +use datafusion_common::config::ConfigOptions; use futures::{ready, StreamExt, TryStreamExt}; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; @@ -117,34 +118,6 @@ impl CsvExec { pub fn escape(&self) -> Option { self.escape } - - /// Redistribute files across partitions according to their size - /// See comments on `repartition_file_groups()` for more detail. - /// - /// Return `None` if can't get repartitioned(empty/compressed file). - pub fn get_repartitioned( - &self, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Option { - // Parallel execution on compressed CSV file is not supported yet. - if self.file_compression_type.is_compressed() { - return None; - } - - let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( - self.base_config.file_groups.clone(), - target_partitions, - repartition_file_min_size, - ); - - if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { - let mut new_plan = self.clone(); - new_plan.base_config.file_groups = repartitioned_file_groups; - return Some(new_plan); - } - None - } } impl DisplayAs for CsvExec { @@ -205,6 +178,35 @@ impl ExecutionPlan for CsvExec { Ok(self) } + /// Redistribute files across partitions according to their size + /// See comments on `repartition_file_groups()` for more detail. + /// + /// Return `None` if can't get repartitioned(empty/compressed file). + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result>> { + let repartition_file_min_size = config.optimizer.repartition_file_min_size; + // Parallel execution on compressed CSV file is not supported yet. + if self.file_compression_type.is_compressed() { + return Ok(None); + } + + let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( + self.base_config.file_groups.clone(), + target_partitions, + repartition_file_min_size, + ); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut new_plan = self.clone(); + new_plan.base_config.file_groups = repartitioned_file_groups; + return Ok(Some(Arc::new(new_plan))); + } + Ok(None) + } + fn execute( &self, partition: usize, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 6643e4127dbd..ea0a9698ff5c 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -527,6 +527,7 @@ mod tests { }; use arrow_schema::Field; use chrono::Utc; + use datafusion_common::config::ConfigOptions; use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; @@ -828,11 +829,7 @@ mod tests { None, ); - let partitioned_file = parquet_exec - .get_repartitioned(4, 0) - .base_config() - .file_groups - .clone(); + let partitioned_file = repartition_with_size(&parquet_exec, 4, 0); assert!(partitioned_file[0][0].range.is_none()); } @@ -893,13 +890,8 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(n_partition, 10) - .base_config() - .file_groups - .clone(), - ); + let actual = + repartition_with_size_to_vec(&parquet_exec, n_partition, 10); assert_eq!(expected, &actual); } @@ -927,13 +919,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(4, 10) - .base_config() - .file_groups - .clone(), - ); + let actual = repartition_with_size_to_vec(&parquet_exec, 4, 10); let expected = vec![ (0, "a".to_string(), 0, 31), (1, "a".to_string(), 31, 62), @@ -964,13 +950,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(96, 5) - .base_config() - .file_groups - .clone(), - ); + let actual = repartition_with_size_to_vec(&parquet_exec, 96, 5); let expected = vec![ (0, "a".to_string(), 0, 1), (1, "a".to_string(), 1, 2), @@ -1007,13 +987,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(3, 10) - .base_config() - .file_groups - .clone(), - ); + let actual = repartition_with_size_to_vec(&parquet_exec, 3, 10); let expected = vec![ (0, "a".to_string(), 0, 34), (1, "a".to_string(), 34, 40), @@ -1046,13 +1020,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(2, 10) - .base_config() - .file_groups - .clone(), - ); + let actual = repartition_with_size_to_vec(&parquet_exec, 2, 10); let expected = vec![ (0, "a".to_string(), 0, 40), (0, "b".to_string(), 0, 10), @@ -1086,11 +1054,7 @@ mod tests { None, ); - let actual = parquet_exec - .get_repartitioned(65, 10) - .base_config() - .file_groups - .clone(); + let actual = repartition_with_size(&parquet_exec, 65, 10); assert_eq!(2, actual.len()); } @@ -1115,17 +1079,47 @@ mod tests { None, ); - let actual = parquet_exec - .get_repartitioned(65, 500) + let actual = repartition_with_size(&parquet_exec, 65, 500); + assert_eq!(1, actual.len()); + } + + /// Calls `ParquetExec.repartitioned` with the specified + /// `target_partitions` and `repartition_file_min_size`, returning the + /// resulting `PartitionedFile`s + fn repartition_with_size( + parquet_exec: &ParquetExec, + target_partitions: usize, + repartition_file_min_size: usize, + ) -> Vec> { + let mut config = ConfigOptions::new(); + config.optimizer.repartition_file_min_size = repartition_file_min_size; + + parquet_exec + .repartitioned(target_partitions, &config) + .unwrap() // unwrap Result + .unwrap() // unwrap Option + .as_any() + .downcast_ref::() + .unwrap() .base_config() .file_groups - .clone(); - assert_eq!(1, actual.len()); + .clone() } - fn file_groups_to_vec( - file_groups: Vec>, + /// Calls `repartition_with_size` and returns a tuple for each output `PartitionedFile`: + /// + /// `(partition index, file path, start, end)` + fn repartition_with_size_to_vec( + parquet_exec: &ParquetExec, + target_partitions: usize, + repartition_file_min_size: usize, ) -> Vec<(usize, String, i64, i64)> { + let file_groups = repartition_with_size( + parquet_exec, + target_partitions, + repartition_file_min_size, + ); + file_groups .iter() .enumerate() diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 3a2459bec817..f6e999f60249 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -259,26 +259,6 @@ impl ParquetExec { self.enable_bloom_filter .unwrap_or(config_options.execution.parquet.bloom_filter_enabled) } - - /// Redistribute files across partitions according to their size - /// See comments on `get_file_groups_repartitioned()` for more detail. - pub fn get_repartitioned( - &self, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Self { - let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( - self.base_config.file_groups.clone(), - target_partitions, - repartition_file_min_size, - ); - - let mut new_plan = self.clone(); - if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { - new_plan.base_config.file_groups = repartitioned_file_groups; - } - new_plan - } } impl DisplayAs for ParquetExec { @@ -349,6 +329,27 @@ impl ExecutionPlan for ParquetExec { Ok(self) } + /// Redistribute files across partitions according to their size + /// See comments on `get_file_groups_repartitioned()` for more detail. + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result>> { + let repartition_file_min_size = config.optimizer.repartition_file_min_size; + let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( + self.base_config.file_groups.clone(), + target_partitions, + repartition_file_min_size, + ); + + let mut new_plan = self.clone(); + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + new_plan.base_config.file_groups = repartitioned_file_groups; + } + Ok(Some(Arc::new(new_plan))) + } + fn execute( &self, partition_index: usize, diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 7b91dce32aa9..2889c268bafa 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -26,9 +26,6 @@ use std::fmt::Formatter; use std::sync::Arc; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::CsvExec; -#[cfg(feature = "parquet")] -use crate::datasource::physical_plan::ParquetExec; use crate::error::Result; use crate::physical_optimizer::utils::{ add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, @@ -1188,7 +1185,6 @@ fn ensure_distribution( // When `false`, round robin repartition will not be added to increase parallelism let enable_round_robin = config.optimizer.enable_round_robin_repartition; let repartition_file_scans = config.optimizer.repartition_file_scans; - let repartition_file_min_size = config.optimizer.repartition_file_min_size; let batch_size = config.execution.batch_size; let is_unbounded = unbounded_output(&dist_context.plan); // Use order preserving variants either of the conditions true @@ -1265,25 +1261,13 @@ fn ensure_distribution( // Unless partitioning doesn't increase the partition count, it is not beneficial: && child.output_partitioning().partition_count() < target_partitions { - // When `repartition_file_scans` is set, leverage source operators - // (`ParquetExec`, `CsvExec` etc.) to increase parallelism at the source. + // When `repartition_file_scans` is set, attempt to increase + // parallelism at the source. if repartition_file_scans { - #[cfg(feature = "parquet")] - if let Some(parquet_exec) = - child.as_any().downcast_ref::() + if let Some(new_child) = + child.repartitioned(target_partitions, config)? { - child = Arc::new(parquet_exec.get_repartitioned( - target_partitions, - repartition_file_min_size, - )); - } - if let Some(csv_exec) = child.as_any().downcast_ref::() { - if let Some(csv_exec) = csv_exec.get_repartitioned( - target_partitions, - repartition_file_min_size, - ) { - child = Arc::new(csv_exec); - } + child = new_child; } } // Increase parallelism by adding round-robin repartitioning @@ -1644,8 +1628,8 @@ mod tests { use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::datasource::physical_plan::FileScanConfig; use crate::datasource::physical_plan::ParquetExec; + use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_plan::aggregates::{ diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index b2f81579f8e8..3ada2fa163fd 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -76,6 +76,7 @@ pub use crate::metrics::Metric; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +use datafusion_common::config::ConfigOptions; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; @@ -209,7 +210,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; - /// creates an iterator + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to + /// produce `target_partitions` partitions. + /// + /// If the `ExecutionPlan` does not support changing its partitioning, + /// returns `Ok(None)` (the default). + /// + /// It is the `ExecutionPlan` can increase its partitioning, but not to the + /// `target_partitions`, it may return an ExecutionPlan with fewer + /// partitions. This might happen, for example, if each new partition would + /// be too small to be efficiently processed individually. + /// + /// The DataFusion optimizer attempts to use as many threads as possible by + /// repartitioning its inputs to match the target number of threads + /// available (`target_partitions`). Some data sources, such as the built in + /// CSV and Parquet readers, implement this method as they are able to read + /// from their input files in parallel, regardless of how the source data is + /// split amongst files. + fn repartitioned( + &self, + _target_partitions: usize, + _config: &ConfigOptions, + ) -> Result>> { + Ok(None) + } + + /// Begin execution of `partition`, returning a stream of [`RecordBatch`]es. fn execute( &self, partition: usize, @@ -217,7 +243,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result; /// Return a snapshot of the set of [`Metric`]s for this - /// [`ExecutionPlan`]. + /// [`ExecutionPlan`]. If no `Metric`s are available, return None. /// /// While the values of the metrics in the returned /// [`MetricsSet`]s may change as execution progresses, the