diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index edf4396c8..50557c4bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -230,42 +230,6 @@ jobs: --workspace --exclude futures-test \ --features unstable --ignore-unknown-features - # When this job failed, run ci/no_atomic_cas.sh and commit result changes. - codegen: - runs-on: ubuntu-latest - permissions: - contents: write - pull-requests: write - steps: - - uses: actions/checkout@v4 - - name: Install Rust - run: rustup update nightly && rustup default nightly - - run: ci/no_atomic_cas.sh - - run: git add -N . && git diff --exit-code - if: github.repository_owner != 'rust-lang' || github.event_name != 'schedule' - - id: diff - run: | - git config user.name "Taiki Endo" - git config user.email "te316e89@gmail.com" - git add -N . - if ! git diff --exit-code; then - git add . - git commit -m "Update no_atomic_cas.rs" - echo "::set-output name=success::false" - fi - if: github.repository_owner == 'rust-lang' && github.event_name == 'schedule' - - uses: peter-evans/create-pull-request@v5 - with: - title: Update no_atomic_cas.rs - body: | - Auto-generated by [create-pull-request][1] - [Please close and immediately reopen this pull request to run CI.][2] - - [1]: https://github.com/peter-evans/create-pull-request - [2]: https://github.com/peter-evans/create-pull-request/blob/HEAD/docs/concepts-guidelines.md#workarounds-to-trigger-further-workflow-runs - branch: update-no-atomic-cas-rs - if: github.repository_owner == 'rust-lang' && github.event_name == 'schedule' && steps.diff.outputs.success == 'false' - miri: name: cargo miri test runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index e689f3691..a1b633b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.3.30 - 2023-12-24 + +* Add `{BiLock,SplitStream,SplitSink,ReadHalf,WriteHalf}::is_pair_of` (#2797) +* Fix panic in `FuturesUnordered::clear` (#2809) +* Fix panic in `AsyncBufReadExt::fill_buf` (#2801, #2812) +* Improve support for targets without atomic CAS (#2811) +* Remove build scripts (#2811) + # 0.3.29 - 2023-10-26 * Add `TryStreamExt::try_ready_chunks` (#2757) diff --git a/ci/no_atomic_cas.sh b/ci/no_atomic_cas.sh deleted file mode 100755 index ba0200d69..000000000 --- a/ci/no_atomic_cas.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -set -euo pipefail -IFS=$'\n\t' -cd "$(dirname "$0")"/.. - -# Update the list of targets that do not support atomic CAS operations. -# -# Usage: -# ./ci/no_atomic_cas.sh - -file="no_atomic_cas.rs" - -no_atomic_cas=() -for target in $(rustc --print target-list); do - target_spec=$(rustc --print target-spec-json -Z unstable-options --target "${target}") - res=$(jq <<<"${target_spec}" -r 'select(."atomic-cas" == false)') - [[ -z "${res}" ]] || no_atomic_cas+=("${target}") -done - -cat >"${file}" <>"${file}" -done -cat >>"${file}" < target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} diff --git a/futures-channel/no_atomic_cas.rs b/futures-channel/no_atomic_cas.rs deleted file mode 120000 index 3d7380fad..000000000 --- a/futures-channel/no_atomic_cas.rs +++ /dev/null @@ -1 +0,0 @@ -../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-channel/src/lib.rs b/futures-channel/src/lib.rs index 4cd936d55..f611e6b9f 100644 --- a/futures-channel/src/lib.rs +++ b/futures-channel/src/lib.rs @@ -27,16 +27,16 @@ ) ))] -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] extern crate alloc; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod lock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "std")] pub mod mpsc; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod oneshot; diff --git a/futures-core/Cargo.toml b/futures-core/Cargo.toml index 704861cc3..0e27db0a6 100644 --- a/futures-core/Cargo.toml +++ b/futures-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-core" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" diff --git a/futures-core/build.rs b/futures-core/build.rs deleted file mode 100644 index 05e0496d9..000000000 --- a/futures-core/build.rs +++ /dev/null @@ -1,41 +0,0 @@ -// The rustc-cfg listed below are considered public API, but it is *unstable* -// and outside of the normal semver guarantees: -// -// - `futures_no_atomic_cas` -// Assume the target does *not* support atomic CAS operations. -// This is usually detected automatically by the build script, but you may -// need to enable it manually when building for custom targets or using -// non-cargo build systems that don't run the build script. -// -// With the exceptions mentioned above, the rustc-cfg emitted by the build -// script are *not* public API. - -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} diff --git a/futures-core/no_atomic_cas.rs b/futures-core/no_atomic_cas.rs deleted file mode 120000 index 3d7380fad..000000000 --- a/futures-core/no_atomic_cas.rs +++ /dev/null @@ -1 +0,0 @@ -../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-core/src/task/__internal/mod.rs b/futures-core/src/task/__internal/mod.rs index 377f3e286..c24874228 100644 --- a/futures-core/src/task/__internal/mod.rs +++ b/futures-core/src/task/__internal/mod.rs @@ -1,4 +1,7 @@ -#[cfg(any(not(futures_no_atomic_cas), feature = "portable-atomic"))] +#[cfg_attr(target_os = "none", cfg(any(target_has_atomic = "ptr", feature = "portable-atomic")))] mod atomic_waker; -#[cfg(any(not(futures_no_atomic_cas), feature = "portable-atomic"))] +#[cfg_attr( + target_os = "none", + cfg(any(target_has_atomic = "ptr", feature = "portable-atomic")) +)] pub use self::atomic_waker::AtomicWaker; diff --git a/futures-executor/Cargo.toml b/futures-executor/Cargo.toml index 22cf99636..391a5adc4 100644 --- a/futures-executor/Cargo.toml +++ b/futures-executor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -16,9 +16,9 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"] thread-pool = ["std", "num_cpus"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.29", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.29", default-features = false } +futures-core = { path = "../futures-core", version = "0.3.30", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.30", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.30", default-features = false } num_cpus = { version = "1.8.0", optional = true } [dev-dependencies] diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml index a0699de52..9911316ac 100644 --- a/futures-io/Cargo.toml +++ b/futures-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-io" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" diff --git a/futures-macro/Cargo.toml b/futures-macro/Cargo.toml index f180d9e9f..6bd4baff1 100644 --- a/futures-macro/Cargo.toml +++ b/futures-macro/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" diff --git a/futures-sink/Cargo.toml b/futures-sink/Cargo.toml index dbf5e91be..6994ba6fc 100644 --- a/futures-sink/Cargo.toml +++ b/futures-sink/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" diff --git a/futures-task/Cargo.toml b/futures-task/Cargo.toml index b3bf447d3..4116a3144 100644 --- a/futures-task/Cargo.toml +++ b/futures-task/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-task" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" diff --git a/futures-task/build.rs b/futures-task/build.rs deleted file mode 100644 index 05e0496d9..000000000 --- a/futures-task/build.rs +++ /dev/null @@ -1,41 +0,0 @@ -// The rustc-cfg listed below are considered public API, but it is *unstable* -// and outside of the normal semver guarantees: -// -// - `futures_no_atomic_cas` -// Assume the target does *not* support atomic CAS operations. -// This is usually detected automatically by the build script, but you may -// need to enable it manually when building for custom targets or using -// non-cargo build systems that don't run the build script. -// -// With the exceptions mentioned above, the rustc-cfg emitted by the build -// script are *not* public API. - -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} diff --git a/futures-task/no_atomic_cas.rs b/futures-task/no_atomic_cas.rs deleted file mode 120000 index 3d7380fad..000000000 --- a/futures-task/no_atomic_cas.rs +++ /dev/null @@ -1 +0,0 @@ -../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-task/src/lib.rs b/futures-task/src/lib.rs index c72460744..33896d8b1 100644 --- a/futures-task/src/lib.rs +++ b/futures-task/src/lib.rs @@ -18,24 +18,24 @@ extern crate alloc; mod spawn; pub use crate::spawn::{LocalSpawn, Spawn, SpawnError}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod arc_wake; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::arc_wake::ArcWake; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod waker; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::waker::waker; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod waker_ref; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::waker_ref::{waker_ref, WakerRef}; diff --git a/futures-task/src/spawn.rs b/futures-task/src/spawn.rs index f4e63397b..4a9a45a44 100644 --- a/futures-task/src/spawn.rs +++ b/futures-task/src/spawn.rs @@ -168,7 +168,7 @@ mod if_alloc { } } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] impl Spawn for alloc::sync::Arc { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { (**self).spawn_obj(future) @@ -179,7 +179,7 @@ mod if_alloc { } } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] impl LocalSpawn for alloc::sync::Arc { fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { (**self).spawn_local_obj(future) diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index 81f00e914..8499e034d 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-test" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -11,13 +11,13 @@ Common utilities for testing components built off futures-rs. """ [dependencies] -futures-core = { version = "0.3.29", path = "../futures-core", default-features = false } -futures-task = { version = "0.3.29", path = "../futures-task", default-features = false } -futures-io = { version = "0.3.29", path = "../futures-io", default-features = false } -futures-util = { version = "0.3.29", path = "../futures-util", default-features = false } -futures-executor = { version = "0.3.29", path = "../futures-executor", default-features = false } -futures-sink = { version = "0.3.29", path = "../futures-sink", default-features = false } -futures-macro = { version = "=0.3.29", path = "../futures-macro", default-features = false } +futures-core = { version = "0.3.30", path = "../futures-core", default-features = false } +futures-task = { version = "0.3.30", path = "../futures-task", default-features = false } +futures-io = { version = "0.3.30", path = "../futures-io", default-features = false } +futures-util = { version = "0.3.30", path = "../futures-util", default-features = false } +futures-executor = { version = "0.3.30", path = "../futures-executor", default-features = false } +futures-sink = { version = "0.3.30", path = "../futures-sink", default-features = false } +futures-macro = { version = "=0.3.30", path = "../futures-macro", default-features = false } pin-utils = { version = "0.1.0", default-features = false } pin-project = "1.0.11" diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 27e9e9498..dcdbce459 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-util" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -35,12 +35,12 @@ write-all-vectored = ["io"] cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.29", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.29", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.29", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.29", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.29", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.30", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.30", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.30", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.30", default-features = false, optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } diff --git a/futures-util/build.rs b/futures-util/build.rs deleted file mode 100644 index 05e0496d9..000000000 --- a/futures-util/build.rs +++ /dev/null @@ -1,41 +0,0 @@ -// The rustc-cfg listed below are considered public API, but it is *unstable* -// and outside of the normal semver guarantees: -// -// - `futures_no_atomic_cas` -// Assume the target does *not* support atomic CAS operations. -// This is usually detected automatically by the build script, but you may -// need to enable it manually when building for custom targets or using -// non-cargo build systems that don't run the build script. -// -// With the exceptions mentioned above, the rustc-cfg emitted by the build -// script are *not* public API. - -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} diff --git a/futures-util/no_atomic_cas.rs b/futures-util/no_atomic_cas.rs deleted file mode 120000 index 3d7380fad..000000000 --- a/futures-util/no_atomic_cas.rs +++ /dev/null @@ -1 +0,0 @@ -../no_atomic_cas.rs \ No newline at end of file diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index ecd1b426d..9ab3b4f1d 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -37,10 +37,6 @@ impl Clone for WeakShared { } } -// The future itself is polled behind the `Arc`, so it won't be moved -// when `Shared` is moved. -impl Unpin for Shared {} - impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Shared") diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 11b6f2728..79eee8dff 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -12,7 +12,7 @@ use core::task::{Context, Poll}; use super::{assert_future, MaybeDone}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { @@ -31,7 +31,7 @@ where kind: JoinAllKind, } -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] pub(crate) const SMALL: usize = 30; enum JoinAllKind @@ -41,7 +41,7 @@ where Small { elems: Pin]>>, }, - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] Big { fut: Collect, Vec>, }, @@ -57,7 +57,7 @@ where JoinAllKind::Small { ref elems } => { f.debug_struct("JoinAll").field("elems", elems).finish() } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), } } @@ -106,7 +106,8 @@ where { let iter = iter.into_iter(); - #[cfg(futures_no_atomic_cas)] + #[cfg(target_os = "none")] + #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))] { let kind = JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; @@ -114,7 +115,7 @@ where assert_future::::Output>, _>(JoinAll { kind }) } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] { let kind = match iter.size_hint().1 { Some(max) if max <= SMALL => JoinAllKind::Small { @@ -153,7 +154,7 @@ where Poll::Pending } } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] JoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 374e36512..2d8fa4f65 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -111,13 +111,13 @@ pub use self::select_ok::{select_ok, SelectOk}; mod either; pub use self::either::Either; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod abortable; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use abortable::abortable; diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 506f45065..2d6a2a02c 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -12,7 +12,7 @@ use core::task::{Context, Poll}; use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; use crate::TryFutureExt; @@ -38,7 +38,7 @@ where Small { elems: Pin>]>>, }, - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] Big { fut: TryCollect>, Vec>, }, @@ -56,7 +56,7 @@ where TryJoinAllKind::Small { ref elems } => { f.debug_struct("TryJoinAll").field("elems", elems).finish() } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), } } @@ -121,7 +121,8 @@ where { let iter = iter.into_iter().map(TryFutureExt::into_future); - #[cfg(futures_no_atomic_cas)] + #[cfg(target_os = "none")] + #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))] { let kind = TryJoinAllKind::Small { elems: iter.map(TryMaybeDone::Future).collect::>().into(), @@ -132,7 +133,7 @@ where ) } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] { let kind = match iter.size_hint().1 { Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { @@ -184,7 +185,7 @@ where } } } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index fdbc4a5f0..ed22d6233 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -57,7 +57,7 @@ where } pin_project! { - /// Future for the [`copy_buf()`] function. + /// Future for the [`copy_buf_abortable()`] function. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct CopyBufAbortable<'a, R, W: ?Sized> { diff --git a/futures-util/src/io/fill_buf.rs b/futures-util/src/io/fill_buf.rs index 6c14f71cd..45862b8e2 100644 --- a/futures-util/src/io/fill_buf.rs +++ b/futures-util/src/io/fill_buf.rs @@ -3,6 +3,7 @@ use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; use std::io; use std::pin::Pin; +use std::slice; /// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method. #[derive(Debug)] @@ -30,19 +31,12 @@ where let reader = this.reader.take().expect("Polled FillBuf after completion"); match Pin::new(&mut *reader).poll_fill_buf(cx) { - // We don't need to poll a second time for EOF, and doing so is likely to return Poll::Pending - Poll::Ready(Ok(&[])) => Poll::Ready(Ok(&[])), - // With polonius it is possible to remove this inner match and just have the correct - // lifetime of the reference inferred based on which branch is taken - Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) { - Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)), - Poll::Ready(Err(err)) => { - unreachable!("reader indicated readiness but then returned an error: {:?}", err) - } - Poll::Pending => { - unreachable!("reader indicated readiness but then returned pending") - } - }, + Poll::Ready(Ok(slice)) => { + // With polonius it is possible to remove this lifetime transmutation and just have + // the correct lifetime of the reference inferred based on which branch is taken + let slice: &'a [u8] = unsafe { slice::from_raw_parts(slice.as_ptr(), slice.len()) }; + Poll::Ready(Ok(slice)) + } Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Pending => { this.reader = Some(reader); diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs index 3f1b9af45..81d1e6dcb 100644 --- a/futures-util/src/io/split.rs +++ b/futures-util/src/io/split.rs @@ -31,6 +31,13 @@ pub(super) fn split(t: T) -> (ReadHalf, WriteHalf< (ReadHalf { handle: a }, WriteHalf { handle: b }) } +impl ReadHalf { + /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &WriteHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + impl ReadHalf { /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back /// together. Succeeds only if the `ReadHalf` and `WriteHalf` are @@ -42,6 +49,13 @@ impl ReadHalf { } } +impl WriteHalf { + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &ReadHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + impl WriteHalf { /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back /// together. Succeeds only if the `ReadHalf` and `WriteHalf` are diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 9a10c93c9..208eb73aa 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -329,7 +329,7 @@ pub use crate::io::{ #[cfg(feature = "alloc")] pub mod lock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod abortable; diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 7ddc66ad2..a89678e05 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -149,6 +149,11 @@ impl BiLock { BiLockAcquire { bilock: self } } + /// Returns `true` only if the other `BiLock` originated from the same call to `BiLock::new`. + pub fn is_pair_of(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.arc, &other.arc) + } + /// Attempts to put the two "halves" of a `BiLock` back together and /// recover the original value. Succeeds only if the two `BiLock`s /// originated from the same call to `BiLock::new`. @@ -156,7 +161,7 @@ impl BiLock { where T: Unpin, { - if Arc::ptr_eq(&self.arc, &other.arc) { + if self.is_pair_of(&other) { drop(other); let inner = Arc::try_unwrap(self.arc) .ok() diff --git a/futures-util/src/lock/mod.rs b/futures-util/src/lock/mod.rs index 0be72717c..8ca0ff625 100644 --- a/futures-util/src/lock/mod.rs +++ b/futures-util/src/lock/mod.rs @@ -3,25 +3,25 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(any(feature = "sink", feature = "io"))] #[cfg(not(feature = "bilock"))] pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "std")] pub use self::mutex::{ MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, }; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "std")] mod mutex; diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index 3aaef8bde..2cc144e81 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -19,7 +19,8 @@ pin_project! { struct OrderWrapper { #[pin] data: T, // A future or a future's output - index: isize, + // Use i64 for index since isize may overflow in 32-bit targets. + index: i64, } } @@ -98,8 +99,8 @@ where pub struct FuturesOrdered { in_progress_queue: FuturesUnordered>, queued_outputs: BinaryHeap>, - next_incoming_index: isize, - next_outgoing_index: isize, + next_incoming_index: i64, + next_outgoing_index: i64, } impl Unpin for FuturesOrdered {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 0dbaea908..dedf75dee 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -558,20 +558,7 @@ impl Debug for FuturesUnordered { impl FuturesUnordered { /// Clears the set, removing all futures. pub fn clear(&mut self) { - self.clear_head_all(); - - // we just cleared all the tasks, and we have &mut self, so this is safe. - unsafe { self.ready_to_run_queue.clear() }; - - self.is_terminated.store(false, Relaxed); - } - - fn clear_head_all(&mut self) { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); - } + *self = Self::new(); } } @@ -581,7 +568,11 @@ impl Drop for FuturesUnordered { // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. - self.clear_head_all(); + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + } // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 451870532..a924935d2 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -85,38 +85,25 @@ impl ReadyToRunQueue { pub(super) fn stub(&self) -> *const Task { Arc::as_ptr(&self.stub) } - - // Clear the queue of tasks. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. This method just pulls out - // tasks and drops their refcounts. - // - // # Safety - // - // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) - // - The caller **must** guarantee unique access to `self` - pub(crate) unsafe fn clear(&self) { - loop { - // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } - } } impl Drop for ReadyToRunQueue { fn drop(&mut self) { // Once we're in the destructor for `Inner` we need to clear out // the ready to run queue of tasks if there's anything left in there. - - // All tasks have had their futures dropped already by the `FuturesUnordered` - // destructor above, and we have &mut self, so this is safe. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. All tasks should have had + // their futures dropped already by the `FuturesUnordered` destructor + // above, so we're just pulling out tasks and dropping their refcounts. unsafe { - self.clear(); + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } } } } diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 34d68a80b..2438e58b6 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -37,13 +37,13 @@ pub use self::stream::ReadyChunks; #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::stream::Forward; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use self::stream::{ BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent, }; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] @@ -51,9 +51,9 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ - try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, - TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext, - TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, + try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll, + TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, + TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; #[cfg(feature = "io")] @@ -61,7 +61,7 @@ pub use self::try_stream::{ #[cfg(feature = "std")] pub use self::try_stream::IntoAsyncRead; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use self::try_stream::{ TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent, @@ -105,36 +105,36 @@ pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithS mod unfold; pub use self::unfold::{unfold, Unfold}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod futures_ordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use self::futures_ordered::FuturesOrdered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod futures_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[doc(inline)] pub use self::futures_unordered::FuturesUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod select_all; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[doc(inline)] pub use self::select_all::{select_all, SelectAll}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod abortable; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use abortable::abortable; diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 3978d188f..2da7036b2 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -181,32 +181,32 @@ mod scan; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::scan::Scan; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod buffer_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::buffer_unordered::BufferUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::buffered::Buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub(crate) mod flatten_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] pub use self::flatten_unordered::FlattenUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] delegate_all!( /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. @@ -216,20 +216,20 @@ delegate_all!( where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U ); -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod for_each_concurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::for_each_concurrent::ForEachConcurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] mod split; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] @@ -820,7 +820,7 @@ pub trait StreamExt: Stream { /// assert_eq!(output, vec![1, 2, 3, 4]); /// # }); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn flatten_unordered(self, limit: impl Into>) -> FlattenUnordered where @@ -902,7 +902,7 @@ pub trait StreamExt: Stream { /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); /// # }); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn flat_map_unordered( self, @@ -1142,7 +1142,7 @@ pub trait StreamExt: Stream { /// fut.await; /// # }) /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn for_each_concurrent( self, @@ -1365,7 +1365,7 @@ pub trait StreamExt: Stream { /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn buffered(self, n: usize) -> Buffered where @@ -1410,7 +1410,7 @@ pub trait StreamExt: Stream { /// assert_eq!(buffered.next().await, None); /// # Ok::<(), i32>(()) }).unwrap(); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn buffer_unordered(self, n: usize) -> BufferUnordered where @@ -1577,7 +1577,7 @@ pub trait StreamExt: Stream { /// library is activated, and it is activated by default. #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn split(self) -> (SplitSink, SplitStream) where diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index e2034e0c2..1a7fdcb38 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -15,6 +15,13 @@ pub struct SplitStream(BiLock); impl Unpin for SplitStream {} +impl SplitStream { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitSink) -> bool { + other.is_pair_of(&self) + } +} + impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are @@ -60,6 +67,13 @@ impl + Unpin, Item> SplitSink { } } +impl SplitSink { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitStream) -> bool { + self.lock.is_pair_of(&other.0) + } +} + impl, Item> SplitSink { fn poll_flush_slot( mut inner: Pin<&mut S>, @@ -142,3 +156,69 @@ impl fmt::Display for ReuniteError { #[cfg(feature = "std")] impl std::error::Error for ReuniteError {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{sink::Sink, stream::StreamExt}; + use core::marker::PhantomData; + + struct NopStream { + phantom: PhantomData, + } + + impl Stream for NopStream { + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + todo!() + } + } + + impl Sink for NopStream { + type Error = (); + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + } + + #[test] + fn test_pairing() { + let s1 = NopStream::<()> { phantom: PhantomData }; + let (sink1, stream1) = s1.split(); + assert!(sink1.is_pair_of(&stream1)); + assert!(stream1.is_pair_of(&sink1)); + + let s2 = NopStream::<()> { phantom: PhantomData }; + let (sink2, stream2) = s2.split(); + assert!(sink2.is_pair_of(&stream2)); + assert!(stream2.is_pair_of(&sink2)); + + assert!(!sink1.is_pair_of(&stream2)); + assert!(!stream1.is_pair_of(&sink2)); + assert!(!sink2.is_pair_of(&stream1)); + assert!(!stream2.is_pair_of(&sink1)); + } +} diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 5d5702f36..7b55444b3 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -89,10 +89,10 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_flatten_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten_unordered::TryFlattenUnordered; @@ -133,26 +133,26 @@ mod try_take_while; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_take_while::TryTakeWhile; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_buffer_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_buffer_unordered::TryBufferUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_buffered::TryBuffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_for_each_concurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_for_each_concurrent::TryForEachConcurrent; @@ -551,7 +551,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(Err(oneshot::Canceled), fut.await); /// # }) /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_for_each_concurrent( self, @@ -827,7 +827,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); /// # }); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_flatten_unordered(self, limit: impl Into>) -> TryFlattenUnordered where @@ -1029,7 +1029,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered where @@ -1105,7 +1105,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_buffered(self, n: usize) -> TryBuffered where diff --git a/futures-util/src/stream/try_stream/try_any.rs b/futures-util/src/stream/try_stream/try_any.rs index 15adb3097..55e876be0 100644 --- a/futures-util/src/stream/try_stream/try_any.rs +++ b/futures-util/src/stream/try_stream/try_any.rs @@ -7,7 +7,7 @@ use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { - /// Future for the [`any`](super::StreamExt::any) method. + /// Future for the [`try_any`](super::TryStreamExt::try_any) method. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryAny { #[pin] diff --git a/futures-util/src/stream/unfold.rs b/futures-util/src/stream/unfold.rs index 7d8ef6bab..2f48cccb4 100644 --- a/futures-util/src/stream/unfold.rs +++ b/futures-util/src/stream/unfold.rs @@ -36,7 +36,7 @@ use pin_project_lite::pin_project; /// let stream = stream::unfold(0, |state| async move { /// if state <= 2 { /// let next_state = state + 1; -/// let yielded = state * 2; +/// let yielded = state * 2; /// Some((yielded, next_state)) /// } else { /// None diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index 3ed4bfada..7a9e993e5 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -18,19 +18,22 @@ pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, pub use futures_task::noop_waker; pub use futures_task::noop_waker_ref; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use futures_task::ArcWake; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use futures_task::waker; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use futures_task::{waker_ref, WakerRef}; -#[cfg(any(not(futures_no_atomic_cas), feature = "portable-atomic"))] +#[cfg_attr( + target_os = "none", + cfg(any(target_has_atomic = "ptr", feature = "portable-atomic")) +)] pub use futures_core::task::__internal::AtomicWaker; mod spawn; diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 96db108d3..6208f616e 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures" -version = "0.3.29" +version = "0.3.30" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -15,13 +15,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.29", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.29", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.29", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.29", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.29", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.29", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.30", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.30", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.30", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.30", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.30", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index b56828047..7bdf5432c 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -381,3 +381,28 @@ fn clear() { tasks.clear(); assert!(!tasks.is_terminated()); } + +// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279 +#[test] +fn clear_in_loop() { + const N: usize = + if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 }; + futures::executor::block_on(async { + async fn task() { + let (s, r) = oneshot::channel(); + std::thread::spawn(|| { + std::thread::sleep(std::time::Duration::from_micros(100)); + let _ = s.send(()); + }); + r.await.unwrap() + } + let mut futures = FuturesUnordered::new(); + for _ in 0..N { + for _ in 0..24 { + futures.push(task()); + } + let _ = futures.next().await; + futures.clear(); + } + }); +} diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs deleted file mode 100644 index 16ec628cd..000000000 --- a/no_atomic_cas.rs +++ /dev/null @@ -1,17 +0,0 @@ -// This file is @generated by no_atomic_cas.sh. -// It is not intended for manual editing. - -const NO_ATOMIC_CAS: &[&str] = &[ - "armv4t-none-eabi", - "armv5te-none-eabi", - "avr-unknown-gnu-atmega328", - "bpfeb-unknown-none", - "bpfel-unknown-none", - "msp430-none-elf", - "riscv32i-unknown-none-elf", - "riscv32im-unknown-none-elf", - "riscv32imc-unknown-none-elf", - "thumbv4t-none-eabi", - "thumbv5te-none-eabi", - "thumbv6m-none-eabi", -];