Skip to content

Commit d6d50f2

Browse files
committed
Backport to 0.1: FuturesUnordered: Do not poll the same future twice per iteration
Same as #2333. The same issue exists in 0.1, so backporting it there helps for code that is still using Futures 0.1 in some places.
1 parent 6db6642 commit d6d50f2

File tree

2 files changed

+57
-20
lines changed

2 files changed

+57
-20
lines changed

src/stream/futures_unordered.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,6 @@ use {task, Stream, Future, Poll, Async};
1515
use executor::{Notify, UnsafeNotify, NotifyHandle};
1616
use task_impl::{self, AtomicTask};
1717

18-
/// Constant used for a `FuturesUnordered` to determine how many times it is
19-
/// allowed to poll underlying futures without yielding.
20-
///
21-
/// A single call to `poll_next` may potentially do a lot of work before
22-
/// yielding. This happens in particular if the underlying futures are awoken
23-
/// frequently but continue to return `Pending`. This is problematic if other
24-
/// tasks are waiting on the executor, since they do not get to run. This value
25-
/// caps the number of calls to `poll` on underlying futures a single call to
26-
/// `poll_next` is allowed to make.
27-
///
28-
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
29-
/// that amortize wakeup and scheduling costs, but low enough that we do not
30-
/// starve other tasks for long.
31-
///
32-
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
33-
const YIELD_EVERY: usize = 32;
34-
3518
/// An unbounded set of futures.
3619
///
3720
/// This "combinator" also serves a special function in this library, providing
@@ -291,6 +274,22 @@ impl<T> Stream for FuturesUnordered<T>
291274
type Error = T::Error;
292275

293276
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
277+
// Variable to determine how many times it is allowed to poll underlying
278+
// futures without yielding.
279+
//
280+
// A single call to `poll_next` may potentially do a lot of work before
281+
// yielding. This happens in particular if the underlying futures are awoken
282+
// frequently but continue to return `Pending`. This is problematic if other
283+
// tasks are waiting on the executor, since they do not get to run. This value
284+
// caps the number of calls to `poll` on underlying futures a single call to
285+
// `poll_next` is allowed to make.
286+
//
287+
// The value is the length of FuturesUnordered. This ensures that each
288+
// future is polled only once at most per iteration.
289+
//
290+
// See also https://github.com/rust-lang/futures-rs/issues/2047.
291+
let yield_every = self.len();
292+
294293
// Keep track of how many child futures we have polled,
295294
// in case we want to forcibly yield.
296295
let mut polled = 0;
@@ -353,7 +352,7 @@ impl<T> Stream for FuturesUnordered<T>
353352
// * The future was extracted above (taken ownership). That way
354353
// if it panics we're guaranteed that the future is
355354
// dropped on this thread and doesn't accidentally get
356-
// dropped on a different thread (bad).
355+
// dropped on YIELD_EVERYa different thread (bad).
357356
// * We unlink the node from our internal queue to preemptively
358357
// assume it'll panic, in which case we'll want to discard it
359358
// regardless.
@@ -398,7 +397,7 @@ impl<T> Stream for FuturesUnordered<T>
398397
*node.future.get() = Some(future);
399398
bomb.queue.link(node);
400399

401-
if polled == YIELD_EVERY {
400+
if polled == yield_every {
402401
// We have polled a large number of futures in a row without yielding.
403402
// To ensure we do not starve other tasks waiting on the executor,
404403
// we yield here, but immediately wake ourselves up to continue.

tests/futures_unordered.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ extern crate futures;
55
use std::any::Any;
66

77
use futures::sync::oneshot;
8-
use futures::stream::futures_unordered;
8+
use std::iter::FromIterator;
9+
use futures::stream::{futures_unordered, FuturesUnordered};
910
use futures::prelude::*;
1011

1112
mod support;
@@ -127,3 +128,40 @@ fn iter_mut_len() {
127128
assert_eq!(iter_mut.len(), 0);
128129
assert!(iter_mut.next().is_none());
129130
}
131+
132+
#[test]
133+
fn polled_only_once_at_most_per_iteration() {
134+
#[derive(Debug, Clone, Copy, Default)]
135+
struct F {
136+
polled: bool,
137+
}
138+
139+
impl Future for F {
140+
type Item = ();
141+
type Error = ();
142+
143+
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
144+
if self.polled {
145+
panic!("polled twice")
146+
} else {
147+
self.polled = true;
148+
Ok(Async::NotReady)
149+
}
150+
}
151+
}
152+
153+
154+
let tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
155+
let mut tasks = futures::executor::spawn(tasks);
156+
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
157+
assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count());
158+
159+
let tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
160+
let mut tasks = futures::executor::spawn(tasks);
161+
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
162+
assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count());
163+
164+
let tasks = FuturesUnordered::<F>::new();
165+
let mut tasks = futures::executor::spawn(tasks);
166+
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
167+
}

0 commit comments

Comments
 (0)