File tree Expand file tree Collapse file tree 1 file changed +19
-16
lines changed
datafusion/core/src/physical_plan Expand file tree Collapse file tree 1 file changed +19
-16
lines changed Original file line number Diff line number Diff line change @@ -97,24 +97,27 @@ pub(crate) fn spawn_buffered(
9797 mut input : SendableRecordBatchStream ,
9898 buffer : usize ,
9999) -> SendableRecordBatchStream {
100- // Use tokio only if running from a tokio context (#2201)
101- if tokio:: runtime:: Handle :: try_current ( ) . is_err ( ) {
102- return input;
103- } ;
104-
105- let mut builder = RecordBatchReceiverStream :: builder ( input. schema ( ) , buffer) ;
106-
107- let sender = builder. tx ( ) ;
100+ // Use tokio only if running from a multi-thread tokio context
101+ match tokio:: runtime:: Handle :: try_current ( ) {
102+ Ok ( handle)
103+ if handle. runtime_flavor ( ) == tokio:: runtime:: RuntimeFlavor :: MultiThread =>
104+ {
105+ let mut builder = RecordBatchReceiverStream :: builder ( input. schema ( ) , buffer) ;
106+
107+ let sender = builder. tx ( ) ;
108+
109+ builder. spawn ( async move {
110+ while let Some ( item) = input. next ( ) . await {
111+ if sender. send ( item) . await . is_err ( ) {
112+ return ;
113+ }
114+ }
115+ } ) ;
108116
109- builder. spawn ( async move {
110- while let Some ( item) = input. next ( ) . await {
111- if sender. send ( item) . await . is_err ( ) {
112- return ;
113- }
117+ builder. build ( )
114118 }
115- } ) ;
116-
117- builder. build ( )
119+ _ => input,
120+ }
118121}
119122
120123/// Computes the statistics for an in-memory RecordBatch
You can’t perform that action at this time.
0 commit comments