1515// specific language governing permissions and limitations 
1616// under the License. 
1717
18- use  std:: any:: Any ; 
19- use  std:: collections:: HashMap ; 
20- use  std:: pin:: Pin ; 
21- use  std:: sync:: Arc ; 
22- use  std:: task:: { ready,  Context ,  Poll } ; 
23- 
2418use  super :: { 
2519    ColumnStatistics ,  DisplayAs ,  ExecutionPlanProperties ,  PlanProperties , 
2620    RecordBatchStream ,  SendableRecordBatchStream ,  Statistics , 
@@ -38,6 +32,13 @@ use crate::{
3832    metrics:: { BaselineMetrics ,  ExecutionPlanMetricsSet ,  MetricsSet } , 
3933    DisplayFormatType ,  ExecutionPlan , 
4034} ; 
35+ use  arrow:: array:: AsArray ; 
36+ use  arrow:: compute:: IncrementalRecordBatchBuilder ; 
37+ use  std:: any:: Any ; 
38+ use  std:: collections:: HashMap ; 
39+ use  std:: pin:: Pin ; 
40+ use  std:: sync:: Arc ; 
41+ use  std:: task:: { ready,  Context ,  Poll } ; 
4142
4243use  arrow:: compute:: filter_record_batch; 
4344use  arrow:: datatypes:: { DataType ,  SchemaRef } ; 
@@ -393,6 +394,11 @@ impl ExecutionPlan for FilterExec {
393394        partition :  usize , 
394395        context :  Arc < TaskContext > , 
395396    )  -> Result < SendableRecordBatchStream >  { 
397+         // todo thread target size through 
398+         let  batch_size = 8192 ; 
399+         let  output_batch_builder =
400+             IncrementalRecordBatchBuilder :: try_new ( self . schema ( ) ,  batch_size) ?; 
401+ 
396402        trace ! ( "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}" ,  partition,  context. session_id( ) ,  context. task_id( ) ) ; 
397403        let  baseline_metrics = BaselineMetrics :: new ( & self . metrics ,  partition) ; 
398404        Ok ( Box :: pin ( FilterExecStream  { 
@@ -401,6 +407,7 @@ impl ExecutionPlan for FilterExec {
401407            input :  self . input . execute ( partition,  context) ?, 
402408            baseline_metrics, 
403409            projection :  self . projection . clone ( ) , 
410+             output_batch_builder :  Some ( output_batch_builder) , 
404411        } ) ) 
405412    } 
406413
@@ -647,6 +654,12 @@ struct FilterExecStream {
647654baseline_metrics :  BaselineMetrics , 
648655    /// The projection indices of the columns in the input schema 
649656projection :  Option < Vec < usize > > , 
657+     /// The currently in progress output batch 
658+ /// 
659+ /// This structure produces cleanly sized batches of target_size 
660+ /// 
661+ /// When None, it means input is exhausted currently filtering 
662+ output_batch_builder :  Option < IncrementalRecordBatchBuilder > , 
650663} 
651664
652665pub  fn  batch_filter ( 
@@ -689,6 +702,46 @@ fn filter_and_project(
689702        } ) 
690703} 
691704
705+ impl  FilterExecStream  { 
706+     /// Evaluates the predicate filter on the given batch and appends and rows that match 
707+ /// to the in progress output batch builder. 
708+ fn  filter_batch ( & mut  self ,  batch :  RecordBatch )  -> Result < ( ) >  { 
709+         self . predicate 
710+             . evaluate ( & batch) 
711+             . and_then ( |v| v. into_array ( batch. num_rows ( ) ) ) 
712+             . and_then ( |filter| { 
713+                 let  Some ( filter)  = filter. as_boolean_opt ( )  else  { 
714+                     return  internal_err ! ( 
715+                         "Cannot create filter_array from non-boolean predicates" 
716+                     ) ; 
717+                 } ; 
718+ 
719+                 let  batch = match  self . projection . as_ref ( )  { 
720+                     Some ( projection)  => { 
721+                         let  projected_columns = projection
722+                             . iter ( ) 
723+                             . map ( |i| Arc :: clone ( batch. column ( * i) ) ) 
724+                             . collect ( ) ; 
725+                         // Safety -- the input was a valid RecordBatch and thus the projection is too 
726+                         unsafe  { 
727+                             RecordBatch :: new_unchecked ( 
728+                                 Arc :: clone ( & self . schema ) , 
729+                                 projected_columns, 
730+                                 batch. num_rows ( ) , 
731+                             ) 
732+                         } 
733+                     } 
734+                     None  => batch, 
735+                 } ; 
736+                 let  output_batch_builder = self 
737+                     . output_batch_builder 
738+                     . as_mut ( ) 
739+                     . expect ( "output_batch_builder should be Some" ) ; 
740+                 Ok ( output_batch_builder. append_filtered ( batch,  filter) ?) 
741+             } ) 
742+     } 
743+ } 
744+ 
692745impl  Stream  for  FilterExecStream  { 
693746    type  Item  = Result < RecordBatch > ; 
694747
@@ -698,23 +751,43 @@ impl Stream for FilterExecStream {
698751    )  -> Poll < Option < Self :: Item > >  { 
699752        let  poll; 
700753        loop  { 
754+             // No more input is done, no more batches to process, so done 
755+             let  Some ( output_batch_builder)  = self . output_batch_builder . as_mut ( )  else  { 
756+                 poll = Poll :: Ready ( None ) ; 
757+                 break ; 
758+             } ; 
759+ 
760+             // If we had a batch ready, return it 
761+             if  let  Some ( batch)  = output_batch_builder. next_batch ( )  { 
762+                 poll = Poll :: Ready ( Some ( Ok ( batch) ) ) ; 
763+                 break ; 
764+             } 
765+ 
766+             // poll next input batch 
701767            match  ready ! ( self . input. poll_next_unpin( cx) )  { 
702768                Some ( Ok ( batch) )  => { 
703-                     let  timer = self . baseline_metrics . elapsed_compute ( ) . timer ( ) ; 
704-                     let  filtered_batch = filter_and_project ( 
705-                         & batch, 
706-                         & self . predicate , 
707-                         self . projection . as_ref ( ) , 
708-                         & self . schema , 
709-                     ) ?; 
769+                     // do the actual work of filtering the batch 
770+                     let  time = self . baseline_metrics . elapsed_compute ( ) . clone ( ) ;  // clone so we can reuse it but it shares the same underlying counter 
771+                     let  timer = time. timer ( ) ; 
772+                     self . filter_batch ( batch) ?; 
710773                    timer. done ( ) ; 
711-                     // Skip entirely filtered batches 
712-                     if  filtered_batch. num_rows ( )  == 0  { 
713-                         continue ; 
714-                     } 
715-                     poll = Poll :: Ready ( Some ( Ok ( filtered_batch) ) ) ; 
774+                     continue ;  // Continue to the next batch 
775+                 } 
776+                 None  => { 
777+                     // end of input stream, finalize the output batch 
778+                     let  output_batch_builder = self 
779+                         . output_batch_builder 
780+                         . take ( ) 
781+                         . expect ( "output_batch_builder should be Some" ) ; 
782+                     let  mut  completed_batches = output_batch_builder. build ( ) ?; 
783+                     assert ! ( 
784+                         completed_batches. len( )  <= 1 , 
785+                         "FilterExecStream should produce at most one batch" 
786+                     ) ; 
787+                     poll = Poll :: Ready ( completed_batches. pop_front ( ) . map ( Ok ) ) ; 
716788                    break ; 
717789                } 
790+                 // error 
718791                value => { 
719792                    poll = Poll :: Ready ( value) ; 
720793                    break ; 
0 commit comments