diff --git a/timely/src/execute.rs b/timely/src/execute.rs index a665e2f34..82c12559b 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -155,7 +155,9 @@ where let alloc = crate::communication::allocator::thread::Thread::new(); let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc); let result = func(&mut worker); - while worker.step_or_park(None) { } + while worker.has_dataflows() { + worker.step_or_park(None); + } result } @@ -283,7 +285,9 @@ where } let result = func(&mut worker); - while worker.step_or_park(None) { } + while worker.has_dataflows() { + worker.step_or_park(None); + } result }) } @@ -382,7 +386,9 @@ where initialize_from(builders, others, move |allocator| { let mut worker = Worker::new(worker_config.clone(), allocator); let result = func(&mut worker); - while worker.step_or_park(None) { } + while worker.has_dataflows() { + worker.step_or_park(None); + } result }) } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index d3d01fa2b..e5f6e7321 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -366,7 +366,7 @@ impl Worker { (x, y) => x.or(y), }; - if !self.dataflows.borrow().is_empty() && delay != Some(Duration::new(0,0)) { + if delay != Some(Duration::new(0,0)) { // Log parking and flush log. if let Some(l) = self.logging().as_mut() { @@ -698,6 +698,11 @@ impl Worker { self.dataflows.borrow().keys().cloned().collect() } + /// True if there is at least one dataflow under management. + pub fn has_dataflows(&self) -> bool { + !self.dataflows.borrow().is_empty() + } + // Acquire a new distinct dataflow identifier. fn allocate_dataflow_index(&mut self) -> usize { *self.dataflow_counter.borrow_mut() += 1;