1717
1818//! Helper functions for the table implementation
1919
20- use std:: mem;
2120use std:: sync:: Arc ;
2221
2322use datafusion_catalog:: Session ;
@@ -40,7 +39,6 @@ use log::{debug, trace};
4039
4140use datafusion_common:: tree_node:: { TreeNode , TreeNodeRecursion } ;
4241use datafusion_common:: { Column , DFSchema , DataFusionError } ;
43- use datafusion_datasource:: file_groups:: FileGroup ;
4442use datafusion_expr:: { Expr , Volatility } ;
4543use datafusion_physical_expr:: create_physical_expr;
4644use object_store:: path:: Path ;
@@ -122,39 +120,6 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
122120/// The maximum number of concurrent listing requests
123121const CONCURRENCY_LIMIT : usize = 100 ;
124122
125- /// Partition the list of files into `n` groups
126- pub fn split_files ( mut file_group : FileGroup , n : usize ) -> Vec < FileGroup > {
127- if file_group. is_empty ( ) {
128- return vec ! [ ] ;
129- }
130-
131- // ObjectStore::list does not guarantee any consistent order and for some
132- // implementations such as LocalFileSystem, it may be inconsistent. Thus
133- // Sort files by path to ensure consistent plans when run more than once.
134- file_group. files . sort_by ( |a, b| a. path ( ) . cmp ( b. path ( ) ) ) ;
135-
136- // effectively this is div with rounding up instead of truncating
137- let chunk_size = file_group. len ( ) . div_ceil ( n) ;
138- let mut chunks = Vec :: with_capacity ( n) ;
139- let mut current_chunk = Vec :: with_capacity ( chunk_size) ;
140- for file in file_group. files . drain ( ..) {
141- current_chunk. push ( file) ;
142- if current_chunk. len ( ) == chunk_size {
143- let full_chunk = FileGroup :: new ( mem:: replace (
144- & mut current_chunk,
145- Vec :: with_capacity ( chunk_size) ,
146- ) ) ;
147- chunks. push ( full_chunk) ;
148- }
149- }
150-
151- if !current_chunk. is_empty ( ) {
152- chunks. push ( FileGroup :: new ( current_chunk) )
153- }
154-
155- chunks
156- }
157-
158123pub struct Partition {
159124 /// The path to the partition, including the table prefix
160125 path : Path ,
@@ -541,6 +506,7 @@ mod tests {
541506 use object_store:: memory:: InMemory ;
542507 use std:: any:: Any ;
543508 use std:: ops:: Not ;
509+ use datafusion_datasource:: file_groups:: FileGroup ;
544510 // use futures::StreamExt;
545511
546512 use super :: * ;
@@ -561,32 +527,33 @@ mod tests {
561527 new_partitioned_file( "e" ) ,
562528 ] ) ;
563529
564- let chunks = split_files ( files. clone ( ) , 1 ) ;
530+ let chunks = files. clone ( ) . split_files ( 1 ) ;
565531 assert_eq ! ( 1 , chunks. len( ) ) ;
566532 assert_eq ! ( 5 , chunks[ 0 ] . len( ) ) ;
567533
568- let chunks = split_files ( files. clone ( ) , 2 ) ;
534+ let chunks = files. clone ( ) . split_files ( 2 ) ;
569535 assert_eq ! ( 2 , chunks. len( ) ) ;
570536 assert_eq ! ( 3 , chunks[ 0 ] . len( ) ) ;
571537 assert_eq ! ( 2 , chunks[ 1 ] . len( ) ) ;
572538
573- let chunks = split_files ( files. clone ( ) , 5 ) ;
539+ let chunks = files. clone ( ) . split_files ( 5 ) ;
574540 assert_eq ! ( 5 , chunks. len( ) ) ;
575541 assert_eq ! ( 1 , chunks[ 0 ] . len( ) ) ;
576542 assert_eq ! ( 1 , chunks[ 1 ] . len( ) ) ;
577543 assert_eq ! ( 1 , chunks[ 2 ] . len( ) ) ;
578544 assert_eq ! ( 1 , chunks[ 3 ] . len( ) ) ;
579545 assert_eq ! ( 1 , chunks[ 4 ] . len( ) ) ;
580546
581- let chunks = split_files ( files , 123 ) ;
547+ let chunks = files . clone ( ) . split_files ( 123 ) ;
582548 assert_eq ! ( 5 , chunks. len( ) ) ;
583549 assert_eq ! ( 1 , chunks[ 0 ] . len( ) ) ;
584550 assert_eq ! ( 1 , chunks[ 1 ] . len( ) ) ;
585551 assert_eq ! ( 1 , chunks[ 2 ] . len( ) ) ;
586552 assert_eq ! ( 1 , chunks[ 3 ] . len( ) ) ;
587553 assert_eq ! ( 1 , chunks[ 4 ] . len( ) ) ;
588554
589- let chunks = split_files ( FileGroup :: default ( ) , 2 ) ;
555+ let mut empty_group = FileGroup :: default ( ) ;
556+ let chunks = empty_group. split_files ( 2 ) ;
590557 assert_eq ! ( 0 , chunks. len( ) ) ;
591558 }
592559
0 commit comments