Skip to content

Commit cbfda98

Browse files
committed
push_front and push_back
1 parent c1d71c8 commit cbfda98

File tree

4 files changed

+87
-6
lines changed

4 files changed

+87
-6
lines changed

futures-util/src/stream/futures_ordered.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,34 @@ impl<Fut: Future> FuturesOrdered<Fut> {
129129
self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
130130
}
131131

132-
/// Push a future into the queue.
132+
/// Pushes a future to the back of the queue.
133133
///
134134
/// This function submits the given future to the internal set for managing.
135135
/// This function will not call `poll` on the submitted future. The caller
136136
/// must ensure that `FuturesOrdered::poll` is called in order to receive
137137
/// task notifications.
138-
pub fn push(&mut self, future: Fut) {
138+
pub fn push_back(&mut self, future: Fut) {
139139
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
140140
self.next_incoming_index += 1;
141141
self.in_progress_queue.push(wrapped);
142142
}
143+
144+
/// Pushes a future to the front of the queue.
145+
///
146+
/// This function submits the given future to the internal set for managing.
147+
/// This function will not call `poll` on the submitted future. The caller
148+
/// must ensure that `FuturesOrdered::poll` is called in order to receive
149+
/// task notifications. This future will be the next future to be returned
150+
/// complete.
151+
pub fn push_front(&mut self, future: Fut) {
152+
if self.next_outgoing_index == 0 {
153+
self.push_back(future)
154+
} else {
155+
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
156+
self.next_outgoing_index -= 1;
157+
self.in_progress_queue.push(wrapped);
158+
}
159+
}
143160
}
144161

145162
impl<Fut: Future> Default for FuturesOrdered<Fut> {
@@ -196,7 +213,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
196213
{
197214
let acc = Self::new();
198215
iter.into_iter().fold(acc, |mut acc, item| {
199-
acc.push(item);
216+
acc.push_back(item);
200217
acc
201218
})
202219
}
@@ -214,7 +231,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
214231
I: IntoIterator<Item = Fut>,
215232
{
216233
for item in iter {
217-
self.push(item);
234+
self.push_back(item);
218235
}
219236
}
220237
}

futures-util/src/stream/stream/buffered.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ where
6969
// our queue of futures.
7070
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
7171
match this.stream.as_mut().poll_next(cx) {
72-
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
72+
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
7373
Poll::Ready(None) | Poll::Pending => break,
7474
}
7575
}

futures-util/src/stream/try_stream/try_buffered.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ where
5555
// our queue of futures. Propagate errors from the stream immediately.
5656
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
5757
match this.stream.as_mut().poll_next(cx)? {
58-
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
58+
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
5959
Poll::Ready(None) | Poll::Pending => break,
6060
}
6161
}

futures/tests/stream_futures_ordered.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use futures::channel::oneshot;
22
use futures::executor::{block_on, block_on_stream};
33
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
44
use futures::stream::{FuturesOrdered, StreamExt};
5+
use futures::task::Poll;
56
use futures_test::task::noop_context;
67
use std::any::Any;
78

@@ -45,6 +46,69 @@ fn works_2() {
4546
assert!(stream.poll_next_unpin(&mut cx).is_ready());
4647
}
4748

49+
#[test]
50+
fn test_push_front() {
51+
let (a_tx, a_rx) = oneshot::channel::<i32>();
52+
let (b_tx, b_rx) = oneshot::channel::<i32>();
53+
let (c_tx, c_rx) = oneshot::channel::<i32>();
54+
let (d_tx, d_rx) = oneshot::channel::<i32>();
55+
56+
let mut stream = FuturesOrdered::new();
57+
58+
let mut cx = noop_context();
59+
60+
stream.push_back(a_rx);
61+
stream.push_back(b_rx);
62+
stream.push_back(c_rx);
63+
64+
a_tx.send(1).unwrap();
65+
b_tx.send(2).unwrap();
66+
c_tx.send(3).unwrap();
67+
68+
// 1 and 2 should be received in order
69+
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
70+
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
71+
72+
stream.push_front(d_rx);
73+
d_tx.send(4).unwrap();
74+
75+
// we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
76+
// and then 3 after it
77+
assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
78+
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
79+
}
80+
81+
#[test]
82+
fn test_push_back() {
83+
let (a_tx, a_rx) = oneshot::channel::<i32>();
84+
let (b_tx, b_rx) = oneshot::channel::<i32>();
85+
let (c_tx, c_rx) = oneshot::channel::<i32>();
86+
let (d_tx, d_rx) = oneshot::channel::<i32>();
87+
88+
let mut stream = FuturesOrdered::new();
89+
90+
let mut cx = noop_context();
91+
92+
stream.push_back(a_rx);
93+
stream.push_back(b_rx);
94+
stream.push_back(c_rx);
95+
96+
a_tx.send(1).unwrap();
97+
b_tx.send(2).unwrap();
98+
c_tx.send(3).unwrap();
99+
100+
// All results should be received in order
101+
102+
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
103+
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
104+
105+
stream.push_back(d_rx);
106+
d_tx.send(4).unwrap();
107+
108+
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
109+
assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
110+
}
111+
48112
#[test]
49113
fn from_iterator() {
50114
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]

0 commit comments

Comments
 (0)