@@ -512,14 +512,18 @@ impl GroupedHashAggregateStream {
512512 } ;
513513
514514 // We don't support blocked emission in steaming aggregation mode
515- let emit_tos_builder = if matches ! ( group_ordering, GroupOrdering :: None ) {
516- let group_values_support_blocked_emission =
517- group_values. supports_blocked_emission ( ) ;
518- let accumulators_support_blocked_emission =
515+ // TODO: I am not sure, if we should disable blocked mode if `accumulator`s are empty.
516+ let emit_tos_builder = if matches ! ( group_ordering, GroupOrdering :: None )
517+ && !accumulators. is_empty ( )
518+ {
519+ let is_blocked_group_values = group_values. supports_blocked_emission ( ) ;
520+ let is_blocked_accumulators =
519521 accumulators. iter ( ) . all ( |a| a. supports_blocked_emission ( ) ) ;
522+
523+ // TODO: if the batch size is too small, maybe we should fallback to single block mode.
520524 GroupStatesContext :: new (
521- group_values_support_blocked_emission ,
522- accumulators_support_blocked_emission ,
525+ is_blocked_group_values ,
526+ is_blocked_accumulators ,
523527 batch_size,
524528 )
525529 } else {
@@ -1117,13 +1121,13 @@ pub struct GroupStatesContext {
11171121
11181122impl GroupStatesContext {
11191123 pub fn new (
1120- group_values_support_blocked_emission : bool ,
1121- accumulators_support_blocked_emission : bool ,
1124+ is_blocked_group_values : bool ,
1125+ is_blocked_accumulators : bool ,
11221126 block_size : usize ,
11231127 ) -> Self {
11241128 Self {
1125- is_blocked_group_values : group_values_support_blocked_emission ,
1126- is_blocked_accumulators : accumulators_support_blocked_emission ,
1129+ is_blocked_group_values,
1130+ is_blocked_accumulators,
11271131 block_size,
11281132 }
11291133 }
0 commit comments