Skip to content

Commit ae070f7

Browse files
committed
Restore Unpin requirement, fix rest of exports
1 parent c21306c commit ae070f7

File tree

6 files changed

+29
-26
lines changed

6 files changed

+29
-26
lines changed

futures-util/src/stream/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1818
#[allow(clippy::module_inception)]
1919
mod stream;
2020
pub use self::stream::{
21-
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
22-
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
23-
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
24-
TryForEach, Unzip, Zip,
21+
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
22+
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
23+
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
24+
TryFold, TryForEach, Unzip, Zip,
2525
};
2626

2727
#[cfg(feature = "std")]

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use alloc::{boxed::Box, sync::Arc};
1+
use alloc::sync::Arc;
22
use core::{
33
cell::UnsafeCell,
44
convert::identity,
@@ -285,7 +285,7 @@ pin_project! {
285285
#[must_use = "streams do nothing unless polled"]
286286
pub struct FlattenUnordered<St> where St: Stream {
287287
#[pin]
288-
inner_streams: FuturesUnordered<PollStreamFut<Pin<Box<St::Item>>>>,
288+
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
289289
#[pin]
290290
stream: St,
291291
poll_state: SharedPollState,
@@ -315,7 +315,7 @@ where
315315
impl<St> FlattenUnordered<St>
316316
where
317317
St: Stream,
318-
St::Item: Stream,
318+
St::Item: Stream + Unpin,
319319
{
320320
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
321321
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
@@ -346,7 +346,7 @@ impl<St> FlattenUnorderedProj<'_, St>
346346
where
347347
St: Stream,
348348
{
349-
/// Checks if current `inner_streams` size is greater than optional limit.
349+
/// Checks if current `inner_streams` bucket size is greater than optional limit.
350350
fn is_exceeded_limit(&self) -> bool {
351351
self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
352352
}
@@ -355,7 +355,7 @@ where
355355
impl<St> FusedStream for FlattenUnordered<St>
356356
where
357357
St: FusedStream,
358-
St::Item: Stream,
358+
St::Item: Stream + Unpin,
359359
{
360360
fn is_terminated(&self) -> bool {
361361
self.stream.is_terminated() && self.inner_streams.is_empty()
@@ -365,7 +365,7 @@ where
365365
impl<St> Stream for FlattenUnordered<St>
366366
where
367367
St: Stream,
368-
St::Item: Stream,
368+
St::Item: Stream + Unpin,
369369
{
370370
type Item = <St::Item as Stream>::Item;
371371

@@ -410,7 +410,7 @@ where
410410

411411
match this.stream.as_mut().poll_next(&mut cx) {
412412
Poll::Ready(Some(inner_stream)) => {
413-
let next_item_fut = PollStreamFut::new(Box::pin(inner_stream));
413+
let next_item_fut = PollStreamFut::new(inner_stream);
414414
// Add new stream to the inner streams bucket
415415
this.inner_streams.as_mut().push(next_item_fut);
416416
// Inner streams must be polled afterward

futures-util/src/stream/stream/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ delegate_all!(
219219
FlatMapUnordered<St, U, F>(
220220
FlattenUnordered<Map<St, F>>
221221
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
222-
where St: Stream, U: Stream, F: FnMut(St::Item) -> U
222+
where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
223223
);
224224

225225
#[cfg(not(futures_no_atomic_cas))]
@@ -832,7 +832,7 @@ pub trait StreamExt: Stream {
832832
#[cfg(feature = "alloc")]
833833
fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
834834
where
835-
Self::Item: Stream,
835+
Self::Item: Stream + Unpin,
836836
Self: Sized,
837837
{
838838
assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
@@ -918,7 +918,7 @@ pub trait StreamExt: Stream {
918918
f: F,
919919
) -> FlatMapUnordered<Self, U, F>
920920
where
921-
U: Stream,
921+
U: Stream + Unpin,
922922
F: FnMut(Self::Item) -> U,
923923
Self: Sized,
924924
{

futures-util/src/stream/try_stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ pub trait TryStreamExt: TryStream {
687687
limit: impl Into<Option<usize>>,
688688
) -> TryFlattenUnordered<Self, I, E>
689689
where
690-
Self::Ok: futures_core::Stream<Item = Result<I, E>>,
690+
Self::Ok: futures_core::Stream<Item = Result<I, E>> + Unpin,
691691
E: From<Self::Error>,
692692
Self: Sized,
693693
{

futures-util/src/stream/try_stream/try_flatten_unordered.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ use crate::StreamExt;
1515
delegate_all!(
1616
/// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
1717
TryFlattenUnordered<St, I, E>(
18-
FlattenUnordered<TryFlattenSuccessful<St, I, E>>
18+
FlattenUnordered<ResultToEither<St, I, E>>
1919
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
2020
+ New[
2121
|stream: St, limit: impl Into<Option<usize>>|
22-
TryFlattenSuccessful::new(stream).flatten_unordered(limit)
22+
ResultToEither::new(stream).flatten_unordered(limit)
2323
]
2424
where
2525
St: TryStream,
2626
St::Ok: Stream<Item = Result<I, E>>,
27+
St::Ok: Unpin,
2728
E: From<St::Error>
2829
);
2930

@@ -32,21 +33,22 @@ pin_project! {
3233
/// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
3334
#[derive(Debug)]
3435
#[must_use = "streams do nothing unless polled"]
35-
pub struct TryFlattenSuccessful<St, I, E>
36+
pub struct ResultToEither<St, I, E>
3637
where
3738
St: TryStream,
3839
St::Ok: Stream<Item = Result<I, E>>,
40+
St::Ok: Unpin,
3941
E: From<St::Error>
4042
{
4143
#[pin]
4244
stream: St,
4345
}
4446
}
4547

46-
impl<St, I, E> TryFlattenSuccessful<St, I, E>
48+
impl<St, I, E> ResultToEither<St, I, E>
4749
where
4850
St: TryStream,
49-
St::Ok: Stream<Item = Result<I, E>>,
51+
St::Ok: Stream<Item = Result<I, E>> + Unpin,
5052
E: From<St::Error>,
5153
{
5254
fn new(stream: St) -> Self {
@@ -56,10 +58,10 @@ where
5658
delegate_access_inner!(stream, St, ());
5759
}
5860

59-
impl<St, I, E> FusedStream for TryFlattenSuccessful<St, I, E>
61+
impl<St, I, E> FusedStream for ResultToEither<St, I, E>
6062
where
6163
St: TryStream + FusedStream,
62-
St::Ok: Stream<Item = Result<I, E>>,
64+
St::Ok: Stream<Item = Result<I, E>> + Unpin,
6365
E: From<St::Error>,
6466
{
6567
fn is_terminated(&self) -> bool {
@@ -89,10 +91,10 @@ type SingleResult<St> = Single<
8991
Result<<<St as TryStream>::Ok as TryStream>::Ok, <<St as TryStream>::Ok as TryStream>::Error>,
9092
>;
9193

92-
impl<St, I, E> Stream for TryFlattenSuccessful<St, I, E>
94+
impl<St, I, E> Stream for ResultToEither<St, I, E>
9395
where
9496
St: TryStream,
95-
St::Ok: Stream<Item = Result<I, E>>,
97+
St::Ok: Stream<Item = Result<I, E>> + Unpin,
9698
E: From<St::Error>,
9799
{
98100
// Item is either an inner stream or a stream containing a single error.
@@ -119,10 +121,10 @@ where
119121

120122
// Forwarding impl of Sink from the underlying stream
121123
#[cfg(feature = "sink")]
122-
impl<St, I, E, Item> Sink<Item> for TryFlattenSuccessful<St, I, E>
124+
impl<St, I, E, Item> Sink<Item> for ResultToEither<St, I, E>
123125
where
124126
St: TryStream + Sink<Item>,
125-
St::Ok: Stream<Item = Result<I, E>>,
127+
St::Ok: Stream<Item = Result<I, E>> + Unpin,
126128
E: From<<St as TryStream>::Error>,
127129
{
128130
type Error = <St as Sink<Item>>::Error;

futures/tests/stream_try_stream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ fn try_flatten_unordered() {
5454
Err(val)
5555
}
5656
})
57+
.map_ok(Box::pin)
5758
.try_flatten_unordered(None);
5859

5960
block_on(async move {

0 commit comments

Comments
 (0)