@@ -49,49 +49,120 @@ macro_rules! merge_helper {
4949    } } ; 
5050} 
5151
52- /// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions 
53- /// while preserving order. 
54- pub  fn  streaming_merge ( 
52+ #[ derive( Default ) ]  
53+ pub  struct  StreamingMergeBuilder < ' a >  { 
5554    streams :  Vec < SendableRecordBatchStream > , 
56-     schema :  SchemaRef , 
57-     expressions :  & [ PhysicalSortExpr ] , 
58-     metrics :  BaselineMetrics , 
59-     batch_size :  usize , 
55+     schema :  Option < SchemaRef > , 
56+     expressions :  & ' a   [ PhysicalSortExpr ] , 
57+     metrics :  Option < BaselineMetrics > , 
58+     batch_size :  Option < usize > , 
6059    fetch :  Option < usize > , 
61-     reservation :  MemoryReservation , 
62- )  ->  Result < SendableRecordBatchStream >   { 
63-      // If there are no sort expressions, preserving the order 
64-      // doesn't mean anything (and result in infinite loops) 
65-     if  expressions . is_empty ( )  { 
66-         return   internal_err ! ( "Sort expressions cannot be empty for streaming merge" ) ; 
60+     reservation :  Option < MemoryReservation > , 
61+ } 
62+ 
63+ impl < ' a >   StreamingMergeBuilder < ' a >   { 
64+     pub   fn   new ( )  ->  Self  { 
65+         Self :: default ( ) 
6766    } 
68-     // Special case single column comparisons with optimized cursor implementations 
69-     if  expressions. len ( )  == 1  { 
70-         let  sort = expressions[ 0 ] . clone ( ) ; 
71-         let  data_type = sort. expr . data_type ( schema. as_ref ( ) ) ?; 
72-         downcast_primitive !  { 
73-             data_type => ( primitive_merge_helper,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) , 
74-             DataType :: Utf8  => merge_helper!( StringArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
75-             DataType :: LargeUtf8  => merge_helper!( LargeStringArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
76-             DataType :: Binary  => merge_helper!( BinaryArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
77-             DataType :: LargeBinary  => merge_helper!( LargeBinaryArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
78-             _ => { } 
79-         } 
67+ 
68+     pub  fn  with_streams ( mut  self ,  streams :  Vec < SendableRecordBatchStream > )  -> Self  { 
69+         self . streams  = streams; 
70+         self 
8071    } 
8172
82-     let  streams = RowCursorStream :: try_new ( 
83-         schema. as_ref ( ) , 
84-         expressions, 
85-         streams, 
86-         reservation. new_empty ( ) , 
87-     ) ?; 
88- 
89-     Ok ( Box :: pin ( SortPreservingMergeStream :: new ( 
90-         Box :: new ( streams) , 
91-         schema, 
92-         metrics, 
93-         batch_size, 
94-         fetch, 
95-         reservation, 
96-     ) ) ) 
73+     pub  fn  with_schema ( mut  self ,  schema :  SchemaRef )  -> Self  { 
74+         self . schema  = Some ( schema) ; 
75+         self 
76+     } 
77+ 
78+     pub  fn  with_expressions ( mut  self ,  expressions :  & ' a  [ PhysicalSortExpr ] )  -> Self  { 
79+         self . expressions  = expressions; 
80+         self 
81+     } 
82+ 
83+     pub  fn  with_metrics ( mut  self ,  metrics :  BaselineMetrics )  -> Self  { 
84+         self . metrics  = Some ( metrics) ; 
85+         self 
86+     } 
87+ 
88+     pub  fn  with_batch_size ( mut  self ,  batch_size :  usize )  -> Self  { 
89+         self . batch_size  = Some ( batch_size) ; 
90+         self 
91+     } 
92+ 
93+     pub  fn  with_fetch ( mut  self ,  fetch :  Option < usize > )  -> Self  { 
94+         self . fetch  = fetch; 
95+         self 
96+     } 
97+ 
98+     pub  fn  with_reservation ( mut  self ,  reservation :  MemoryReservation )  -> Self  { 
99+         self . reservation  = Some ( reservation) ; 
100+         self 
101+     } 
102+ 
103+     pub  fn  build ( self )  -> Result < SendableRecordBatchStream >  { 
104+         let  Self  { 
105+             streams, 
106+             schema, 
107+             metrics, 
108+             batch_size, 
109+             reservation, 
110+             fetch, 
111+             expressions, 
112+         }  = self ; 
113+ 
114+         // Early return if streams or expressions are empty 
115+         let  checks = [ 
116+             ( 
117+                 streams. is_empty ( ) , 
118+                 "Streams cannot be empty for streaming merge" , 
119+             ) , 
120+             ( 
121+                 expressions. is_empty ( ) , 
122+                 "Sort expressions cannot be empty for streaming merge" , 
123+             ) , 
124+         ] ; 
125+ 
126+         if  let  Some ( ( _,  error_message) )  = checks. iter ( ) . find ( |( condition,  _) | * condition) 
127+         { 
128+             return  internal_err ! ( "{}" ,  error_message) ; 
129+         } 
130+ 
131+         // Unwrapping mandatory fields 
132+         let  schema = schema. expect ( "Schema cannot be empty for streaming merge" ) ; 
133+         let  metrics = metrics. expect ( "Metrics cannot be empty for streaming merge" ) ; 
134+         let  batch_size =
135+             batch_size. expect ( "Batch size cannot be empty for streaming merge" ) ; 
136+         let  reservation =
137+             reservation. expect ( "Reservation cannot be empty for streaming merge" ) ; 
138+ 
139+         // Special case single column comparisons with optimized cursor implementations 
140+         if  expressions. len ( )  == 1  { 
141+             let  sort = expressions[ 0 ] . clone ( ) ; 
142+             let  data_type = sort. expr . data_type ( schema. as_ref ( ) ) ?; 
143+             downcast_primitive !  { 
144+                 data_type => ( primitive_merge_helper,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) , 
145+                 DataType :: Utf8  => merge_helper!( StringArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
146+                 DataType :: LargeUtf8  => merge_helper!( LargeStringArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
147+                 DataType :: Binary  => merge_helper!( BinaryArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
148+                 DataType :: LargeBinary  => merge_helper!( LargeBinaryArray ,  sort,  streams,  schema,  metrics,  batch_size,  fetch,  reservation) 
149+                 _ => { } 
150+             } 
151+         } 
152+ 
153+         let  streams = RowCursorStream :: try_new ( 
154+             schema. as_ref ( ) , 
155+             expressions, 
156+             streams, 
157+             reservation. new_empty ( ) , 
158+         ) ?; 
159+         Ok ( Box :: pin ( SortPreservingMergeStream :: new ( 
160+             Box :: new ( streams) , 
161+             schema, 
162+             metrics, 
163+             batch_size, 
164+             fetch, 
165+             reservation, 
166+         ) ) ) 
167+     } 
97168} 
0 commit comments