Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
# cargo does not support for --features/--no-default-features with workspace, so use cargo-hack instead.
# Refs: cargo#3620, cargo#4106, cargo#4463, cargo#4753, cargo#5015, cargo#5364, cargo#6195
- run: cargo install cargo-hack
- run: cargo +stable install cargo-hack
# remove dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866
- run: cargo hack --remove-dev-deps --workspace
# Check no-default-features
Expand Down Expand Up @@ -81,7 +81,7 @@ jobs:
- uses: actions/checkout@v2
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo install cargo-hack
- run: cargo +stable install cargo-hack
# remove dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866
- run: cargo hack --remove-dev-deps --workspace
# Check no-default-features
Expand All @@ -108,7 +108,7 @@ jobs:
- uses: actions/checkout@v2
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo install cargo-hack
- run: cargo +stable install cargo-hack
- run: cargo hack build --workspace --no-dev-deps

build:
Expand Down
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
disable_all_formatting = true
use_small_heuristics = "Max"
6 changes: 2 additions & 4 deletions examples/functional/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ fn main() {
// responsible for transmission
pool.spawn_ok(fut_tx_result);

let fut_values = rx
.map(|v| v * 2)
.collect();
let fut_values = rx.map(|v| v * 2).collect();

// Use the executor provided to this async block to wait for the
// future to complete.
Expand All @@ -45,4 +43,4 @@ fn main() {
let values: Vec<i32> = executor::block_on(fut_values);

println!("Values={:?}", values);
}
}
4 changes: 2 additions & 2 deletions examples/imperative/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
// of the stream to be available.
while let Some(v) = rx.next().await {
pending.push(v * 2);
};
}

pending
};
Expand All @@ -45,4 +45,4 @@ fn main() {
let values: Vec<i32> = executor::block_on(fut_values);

println!("Values={:?}", values);
}
}
15 changes: 3 additions & 12 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use {
futures::{
channel::mpsc::{self, Sender, UnboundedSender},
ready,
stream::{Stream, StreamExt},
sink::Sink,
stream::{Stream, StreamExt},
task::{Context, Poll},
},
futures_test::task::noop_context,
Expand All @@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) {
// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

Expand Down Expand Up @@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) {
})
}


/// A Stream that continuously sends incrementing number of the queue
struct TestSender {
tx: Sender<u32>,
Expand All @@ -84,9 +82,7 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);

Expand Down Expand Up @@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);

let mut tx: Vec<_> = (0..100).map(|_| {
TestSender {
tx: tx.clone(),
last: 0
}
}).collect();
let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();

for i in 0..10 {
for x in &mut tx {
Expand Down
2 changes: 0 additions & 2 deletions futures-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
//! library is activated, and it is activated by default.

#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
Expand Down
10 changes: 3 additions & 7 deletions futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::stream::StreamExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

Expand All @@ -11,9 +11,7 @@ fn sequence() {
let (tx, rx) = mpsc::channel(1);

let amt = 20;
let t = thread::spawn(move || {
block_on(send_sequence(amt, tx))
});
let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
let list: Vec<_> = block_on(rx.collect());
let mut list = list.into_iter();
for i in (1..=amt).rev() {
Expand All @@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
let f = poll_fn(|cx| {
rx.poll_next_unpin(cx)
});
let f = poll_fn(|cx| rx.poll_next_unpin(cx));
assert_eq!(block_on(f), None)
}

Expand Down
28 changes: 13 additions & 15 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use std::time::{Duration, Instant};
fn smoke() {
let (mut sender, receiver) = mpsc::channel(1);

let t = thread::spawn(move || {
while let Ok(()) = block_on(sender.send(42)) {}
});
let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});

// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
Expand Down Expand Up @@ -166,7 +164,7 @@ fn stress_try_send_as_receiver_closes() {
struct TestRx {
rx: mpsc::Receiver<Arc<()>>,
// The number of times to query `rx` before dropping it.
poll_count: usize
poll_count: usize,
}
struct TestTask {
command_rx: mpsc::Receiver<TestRx>,
Expand All @@ -190,14 +188,11 @@ fn stress_try_send_as_receiver_closes() {
impl Future for TestTask {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the test channel, if one is present.
if let Some(rx) = &mut self.test_rx {
if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
let _ = v.expect("test finished unexpectedly!");
let _ = v.expect("test finished unexpectedly!");
}
self.countdown -= 1;
// Busy-poll until the countdown is finished.
Expand All @@ -209,9 +204,9 @@ fn stress_try_send_as_receiver_closes() {
self.test_rx = Some(rx);
self.countdown = poll_count;
cx.waker().wake_by_ref();
},
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => {},
Poll::Pending => {}
}
if self.countdown == 0 {
// Countdown complete -- drop the Receiver.
Expand Down Expand Up @@ -255,10 +250,14 @@ fn stress_try_send_as_receiver_closes() {
if prev_weak.upgrade().is_none() {
break;
}
assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
assert!(
t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
"item not dropped on iteration {} after \
{} sends ({} successful). spin=({})",
i, attempted_sends, successful_sends, spins
i,
attempted_sends,
successful_sends,
spins
);
spins += 1;
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
Expand All @@ -273,8 +272,7 @@ fn stress_try_send_as_receiver_closes() {
}
}
drop(cmd_tx);
bg.join()
.expect("background thread join");
bg.join().expect("background thread join");
}

#[test]
Expand Down
33 changes: 15 additions & 18 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
use futures::future::{FutureExt, poll_fn};
use futures::stream::{Stream, StreamExt};
use futures::future::{poll_fn, FutureExt};
use futures::pin_mut;
use futures::sink::{Sink, SinkExt};
use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures::pin_mut;
use futures_test::task::{new_count_waker, noop_context};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

trait AssertSend: Send {}
Expand Down Expand Up @@ -77,7 +77,7 @@ fn send_shared_recv() {
fn send_recv_threads() {
let (mut tx, rx) = mpsc::channel::<i32>(16);

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
block_on(tx.send(1)).unwrap();
});

Expand Down Expand Up @@ -204,7 +204,7 @@ fn stress_shared_unbounded() {
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
Expand All @@ -215,7 +215,7 @@ fn stress_shared_unbounded() {
for _ in 0..NTHREADS {
let tx = tx.clone();

thread::spawn(move|| {
thread::spawn(move || {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
Expand All @@ -233,7 +233,7 @@ fn stress_shared_bounded_hard() {
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
Expand Down Expand Up @@ -297,9 +297,9 @@ fn stress_receiver_multi_task_bounded_hard() {
}
Poll::Ready(None) => {
*rx_opt = None;
break
},
Poll::Pending => {},
break;
}
Poll::Pending => {}
}
}
} else {
Expand All @@ -311,7 +311,6 @@ fn stress_receiver_multi_task_bounded_hard() {
th.push(t);
}


for i in 0..AMT {
block_on(tx.send(i)).unwrap();
}
Expand All @@ -328,7 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() {
/// after sender dropped.
#[test]
fn stress_drop_sender() {
fn list() -> impl Stream<Item=i32> {
fn list() -> impl Stream<Item = i32> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(send_one_two_three(tx));
Expand Down Expand Up @@ -407,9 +406,7 @@ fn stress_poll_ready() {
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
threads.push(thread::spawn(move || {
block_on(stress_poll_ready_sender(sender, AMT))
}));
threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
}
drop(tx);

Expand All @@ -436,7 +433,7 @@ fn try_send_1() {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
break
break;
}
}
}
Expand Down Expand Up @@ -542,8 +539,8 @@ fn is_connected_to() {

#[test]
fn hash_receiver() {
use std::hash::Hasher;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;

let mut hasher_a1 = DefaultHasher::new();
let mut hasher_a2 = DefaultHasher::new();
Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{FutureExt, poll_fn};
use futures::future::{poll_fn, FutureExt};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::sync::mpsc;
Expand Down Expand Up @@ -70,7 +70,7 @@ fn close() {
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
Poll::Ready(Err(_)) => {},
Poll::Ready(Err(_)) => {}
_ => panic!(),
};
assert!(tx.poll_canceled(cx).is_ready());
Expand Down
10 changes: 4 additions & 6 deletions futures-core/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ pub trait TryFuture: Future + private_try_future::Sealed {
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Future` trait; in the future it won't be
/// needed.
fn try_poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Ok, Self::Error>>;
fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>;
}

impl<F, T, E> TryFuture for F
where F: ?Sized + Future<Output = Result<T, E>>
where
F: ?Sized + Future<Output = Result<T, E>>,
{
type Ok = T;
type Error = E;
Expand All @@ -87,8 +85,8 @@ impl<F, T, E> TryFuture for F

#[cfg(feature = "alloc")]
mod if_alloc {
use alloc::boxed::Box;
use super::*;
use alloc::boxed::Box;

impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> {
fn is_terminated(&self) -> bool {
Expand Down
Loading