@@ -82,8 +82,9 @@ use datafusion_physical_expr::PhysicalExprRef;
8282use datafusion_physical_expr_common:: datum:: compare_op_for_nested;
8383
8484use ahash:: RandomState ;
85+ use datafusion_common_runtime:: SpawnedTask ;
8586use datafusion_physical_expr_common:: physical_expr:: fmt_sql;
86- use futures:: { ready, Stream , StreamExt , TryStreamExt } ;
87+ use futures:: { ready, FutureExt , Stream , StreamExt , TryStreamExt } ;
8788use parking_lot:: Mutex ;
8889
8990/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
@@ -810,15 +811,22 @@ impl ExecutionPlan for HashJoinExec {
810811 let reservation =
811812 MemoryConsumer :: new ( "HashJoinInput" ) . register ( context. memory_pool ( ) ) ;
812813
813- Ok ( collect_left_input (
814+ let task = collect_left_input (
814815 self . random_state . clone ( ) ,
815816 left_stream,
816817 on_left. clone ( ) ,
817818 join_metrics. clone ( ) ,
818819 reservation,
819820 need_produce_result_in_final ( self . join_type ) ,
820821 self . right ( ) . output_partitioning ( ) . partition_count ( ) ,
821- ) )
822+ ) ;
823+
824+ Ok ( async move {
825+ // Spawn a task the first time the stream is polled for the build phase.
826+ // This ensures the consumer of the join does not poll unnecessarily
827+ // while the build is ongoing
828+ SpawnedTask :: spawn ( task) . map ( |r| r?) . await
829+ } )
822830 } ) ?,
823831 PartitionMode :: Partitioned => {
824832 let left_stream = self . left . execute ( partition, Arc :: clone ( & context) ) ?;
@@ -827,15 +835,22 @@ impl ExecutionPlan for HashJoinExec {
827835 MemoryConsumer :: new ( format ! ( "HashJoinInput[{partition}]" ) )
828836 . register ( context. memory_pool ( ) ) ;
829837
830- OnceFut :: new ( collect_left_input (
838+ let task = collect_left_input (
831839 self . random_state . clone ( ) ,
832840 left_stream,
833841 on_left. clone ( ) ,
834842 join_metrics. clone ( ) ,
835843 reservation,
836844 need_produce_result_in_final ( self . join_type ) ,
837845 1 ,
838- ) )
846+ ) ;
847+
848+ OnceFut :: new ( async move {
849+ // Spawn a task the first time the stream is polled for the build phase.
850+ // This ensures the consumer of the join does not poll unnecessarily
851+ // while the build is ongoing
852+ SpawnedTask :: spawn ( task) . map ( |r| r?) . await
853+ } )
839854 }
840855 PartitionMode :: Auto => {
841856 return plan_err ! (
0 commit comments