From a394567b7abafe9f6d4e5cb7423456dad0dcf896 Mon Sep 17 00:00:00 2001 From: xgroleau Date: Thu, 27 Oct 2022 20:43:04 -0400 Subject: [PATCH 01/13] wip --- core/Cargo.toml | 1 + core/src/bbbuffer.rs | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/core/Cargo.toml b/core/Cargo.toml index c1f2044..e8cb537 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -16,6 +16,7 @@ license = "MIT OR Apache-2.0" [dependencies] cortex-m = { version = "0.6.0", optional = true } +embassy-sync = { version = "0.1.0" } #, optional = true } [features] thumbv6 = ["cortex-m"] diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 1492a55..c813563 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -16,6 +16,8 @@ use core::{ Ordering::{AcqRel, Acquire, Release}, }, }; +use embassy_sync::waitqueue::WakerRegistration; + #[derive(Debug)] /// A backing structure for a BBQueue. Can be used to create either /// a BBQueue or a split Producer/Consumer pair @@ -47,6 +49,12 @@ pub struct BBBuffer { /// Is there an active write grant? write_in_progress: AtomicBool, + /// Read waker for async support + read_waker: WakerRegistration, + + /// Write waker for async support + write_waker: WakerRegistration, + /// Have we already split? already_split: AtomicBool, } From f0286888eaf3dc4c70ddd54f9af99f47a6dac2dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Fri, 28 Oct 2022 10:31:49 -0400 Subject: [PATCH 02/13] Basic async support --- bbqtest/Cargo.toml | 2 + bbqtest/src/async_usage.rs | 51 ++++++++++++++ bbqtest/src/lib.rs | 1 + core/src/bbbuffer.rs | 134 ++++++++++++++++++++++++++++++++++++- 4 files changed, 185 insertions(+), 3 deletions(-) create mode 100644 bbqtest/src/async_usage.rs diff --git a/bbqtest/Cargo.toml b/bbqtest/Cargo.toml index 6275e0b..8d82082 100644 --- a/bbqtest/Cargo.toml +++ b/bbqtest/Cargo.toml @@ -19,6 +19,8 @@ crossbeam-utils = "0.7" crossbeam = "0.7" heapless = "0.7" cfg-if = "0.1" +futures = "0.3" + [[bench]] name = "benches" diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs new file mode 100644 index 0000000..62fcd9f --- /dev/null +++ b/bbqtest/src/async_usage.rs @@ -0,0 +1,51 @@ +#[cfg(test)] +mod tests { + use bbqueue::BBBuffer; + use futures::executor::block_on; + + #[test] + fn test_read() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + + { + let mut grant = prod.grant_exact(4).unwrap(); + let buf = grant.buf(); + buf[0] = 0xDE; + buf[1] = 0xAD; + buf[2] = 0xC0; + buf[3] = 0xDE; + grant.commit(4); + } + + let r_grant = block_on(cons.read_async()).unwrap(); + + assert_eq!(4, r_grant.len()); + assert_eq!(r_grant[0], 0xDE); + assert_eq!(r_grant[1], 0xAD); + assert_eq!(r_grant[2], 0xC0); + assert_eq!(r_grant[3], 0xDE); + } + + #[test] + fn test_write() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + + let mut w_grant = block_on(prod.grant_exact_async(4)).unwrap(); + assert_eq!(4, w_grant.len()); + w_grant[0] = 0xDE; + w_grant[1] = 0xAD; + w_grant[2] = 0xC0; + w_grant[3] = 0xDE; + w_grant.commit(4); + + let grant = cons.read().unwrap(); + let rx_buf = grant.buf(); + assert_eq!(4, rx_buf.len()); + assert_eq!(rx_buf[0], 0xDE); + assert_eq!(rx_buf[1], 0xAD); + assert_eq!(rx_buf[2], 0xC0); + assert_eq!(rx_buf[3], 0xDE); + } +} diff --git a/bbqtest/src/lib.rs b/bbqtest/src/lib.rs index 5f1843e..7cfbd1b 100644 --- a/bbqtest/src/lib.rs +++ b/bbqtest/src/lib.rs @@ -1,6 +1,7 @@ //! NOTE: this crate is really just a shim for testing //! the other no-std crate. +mod async_usage; mod framed; mod multi_thread; mod ring_around_the_senders; diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index c813563..b1d52b6 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -5,9 +5,11 @@ use crate::{ use core::{ cell::UnsafeCell, cmp::min, + future::Future, marker::PhantomData, mem::{forget, transmute, MaybeUninit}, ops::{Deref, DerefMut}, + pin::Pin, ptr::NonNull, result::Result as CoreResult, slice::from_raw_parts_mut, @@ -15,6 +17,7 @@ use core::{ AtomicBool, AtomicUsize, Ordering::{AcqRel, Acquire, Release}, }, + task::{Context, Poll}, }; use embassy_sync::waitqueue::WakerRegistration; @@ -49,14 +52,14 @@ pub struct BBBuffer { /// Is there an active write grant? write_in_progress: AtomicBool, + /// Have we already split? + already_split: AtomicBool, + /// Read waker for async support read_waker: WakerRegistration, /// Write waker for async support write_waker: WakerRegistration, - - /// Have we already split? - already_split: AtomicBool, } unsafe impl Sync for BBBuffer {} @@ -285,6 +288,12 @@ impl BBBuffer { /// We haven't split at the start already_split: AtomicBool::new(false), + + /// Shared between reader and writer + read_waker: WakerRegistration::new(), + + /// Shared between reader and writer + write_waker: WakerRegistration::new(), } } } @@ -515,6 +524,19 @@ impl<'a, const N: usize> Producer<'a, N> { to_commit: 0, }) } + + /// Async version of [Self::grant_exact] + pub fn grant_exact_async(&'_ mut self, sz: usize) -> GrantExactFuture<'a, '_, N> { + GrantExactFuture { prod: self, sz } + } + + /// Async version of [Self::grant_max_remaining] + pub fn grant_max_remaining_async( + &'_ mut self, + sz: usize, + ) -> GrantMaxRemainingFuture<'a, '_, N> { + GrantMaxRemainingFuture { prod: self, sz } + } } /// `Consumer` is the primary interface for reading data from a `BBBuffer`. @@ -660,6 +682,16 @@ impl<'a, const N: usize> Consumer<'a, N> { to_release: 0, }) } + + /// Async version of [Self::read] + pub fn read_async<'b>(&'b mut self) -> GrantReadFuture<'a, 'b, N> { + GrantReadFuture { cons: self } + } + + /// Async version of [Self::split_read] + pub fn split_read_async<'b>(&'b mut self) -> GrantSplitReadFuture<'a, 'b, N> { + GrantSplitReadFuture { cons: self } + } } impl BBBuffer { @@ -849,6 +881,7 @@ impl<'a, const N: usize> GrantW<'a, N> { // Allow subsequent grants inner.write_in_progress.store(false, Release); + unsafe { self.bbq.as_mut().read_waker.wake() }; } /// Configures the amount of bytes to be commited on drop. @@ -955,6 +988,7 @@ impl<'a, const N: usize> GrantR<'a, N> { let _ = atomic::fetch_add(&inner.read, used, Release); inner.read_in_progress.store(false, Release); + unsafe { self.bbq.as_mut().write_waker.wake() }; } /// Configures the amount of bytes to be released on drop. @@ -1103,6 +1137,100 @@ impl<'a, const N: usize> DerefMut for GrantR<'a, N> { } } +/// Future returned [Producer::grant_exact_async] +pub struct GrantExactFuture<'a, 'b, const N: usize> { + prod: &'b mut Producer<'a, N>, + sz: usize, +} + +impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let sz = self.sz; + + match self.prod.grant_exact(sz) { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::GrantInProgress => { + unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + +/// Future returned [Producer::grant_max_remaining_async] +pub struct GrantMaxRemainingFuture<'a, 'b, const N: usize> { + prod: &'b mut Producer<'a, N>, + sz: usize, +} + +impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let sz = self.sz; + + match self.prod.grant_max_remaining(sz) { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::GrantInProgress | Error::InsufficientSize => { + unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + +/// Future returned [Consumer::read] +pub struct GrantReadFuture<'a, 'b, const N: usize> { + cons: &'b mut Consumer<'a, N>, +} + +impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.cons.read() { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::InsufficientSize | Error::GrantInProgress => { + unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + +/// Future returned [Consumer::split_read] +pub struct GrantSplitReadFuture<'a, 'b, const N: usize> { + cons: &'b mut Consumer<'a, N>, +} + +impl<'a, 'b, const N: usize> Future for GrantSplitReadFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.cons.split_read() { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::InsufficientSize | Error::GrantInProgress => { + unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + #[cfg(feature = "thumbv6")] mod atomic { use core::sync::atomic::{ From e93a9790243a141e3d8cc13a81b554cfd7ff62ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Fri, 28 Oct 2022 10:49:45 -0400 Subject: [PATCH 03/13] added exection order test --- bbqtest/src/async_usage.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs index 62fcd9f..f8ff798 100644 --- a/bbqtest/src/async_usage.rs +++ b/bbqtest/src/async_usage.rs @@ -1,7 +1,9 @@ #[cfg(test)] mod tests { use bbqueue::BBBuffer; - use futures::executor::block_on; + use futures::{executor::block_on, future::join}; + + use crate::async_usage; #[test] fn test_read() { @@ -48,4 +50,35 @@ mod tests { assert_eq!(rx_buf[2], 0xC0); assert_eq!(rx_buf[3], 0xDE); } + + #[test] + fn test_read_after_write() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + + let read_fut = async { + let r_grant = cons.read_async().await.unwrap(); + r_grant.release(4); + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Read completed at {:?}", time); + time + }; + + let write_fut = async { + let mut w_grant = prod.grant_exact_async(4).await.unwrap(); + w_grant[0] = 0xDE; + w_grant[1] = 0xAD; + w_grant[2] = 0xC0; + w_grant[3] = 0xDE; + w_grant.commit(4); + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Write completed at {:?}", time); + time + }; + + let (r_time, w_time) = block_on(join(read_fut, write_fut)); + assert!(r_time > w_time) + } } From 318a07cdd91ed2c7ad9d2f1bf77c37b4b677d7f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Fri, 28 Oct 2022 11:30:40 -0400 Subject: [PATCH 04/13] Removed unused import --- bbqtest/src/async_usage.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs index f8ff798..868ee1f 100644 --- a/bbqtest/src/async_usage.rs +++ b/bbqtest/src/async_usage.rs @@ -3,8 +3,6 @@ mod tests { use bbqueue::BBBuffer; use futures::{executor::block_on, future::join}; - use crate::async_usage; - #[test] fn test_read() { let bb: BBBuffer<6> = BBBuffer::new(); From 63adbae66ad99e93a024a454737cc089f4bc80e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Fri, 28 Oct 2022 11:31:09 -0400 Subject: [PATCH 05/13] removed optional --- core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index e8cb537..dabdbcd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -16,7 +16,7 @@ license = "MIT OR Apache-2.0" [dependencies] cortex-m = { version = "0.6.0", optional = true } -embassy-sync = { version = "0.1.0" } #, optional = true } +embassy-sync = "0.1.0" [features] thumbv6 = ["cortex-m"] From 91362d6c9cec6aa9a52697a9ed73859171f08cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Mon, 31 Oct 2022 08:46:56 -0400 Subject: [PATCH 06/13] Remove unsound grant exact async api --- bbqtest/src/async_usage.rs | 6 +++--- core/src/bbbuffer.rs | 44 ++++++++------------------------------ 2 files changed, 12 insertions(+), 38 deletions(-) diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs index 868ee1f..7629938 100644 --- a/bbqtest/src/async_usage.rs +++ b/bbqtest/src/async_usage.rs @@ -9,7 +9,7 @@ mod tests { let (mut prod, mut cons) = bb.try_split().unwrap(); { - let mut grant = prod.grant_exact(4).unwrap(); + let mut grant = prod.grant_max_remaining(4).unwrap(); let buf = grant.buf(); buf[0] = 0xDE; buf[1] = 0xAD; @@ -32,7 +32,7 @@ mod tests { let bb: BBBuffer<6> = BBBuffer::new(); let (mut prod, mut cons) = bb.try_split().unwrap(); - let mut w_grant = block_on(prod.grant_exact_async(4)).unwrap(); + let mut w_grant = block_on(prod.grant_max_remaining_async(4)).unwrap(); assert_eq!(4, w_grant.len()); w_grant[0] = 0xDE; w_grant[1] = 0xAD; @@ -64,7 +64,7 @@ mod tests { }; let write_fut = async { - let mut w_grant = prod.grant_exact_async(4).await.unwrap(); + let mut w_grant = prod.grant_max_remaining_async(4).await.unwrap(); w_grant[0] = 0xDE; w_grant[1] = 0xAD; w_grant[2] = 0xC0; diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index b1d52b6..02addb1 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -525,11 +525,6 @@ impl<'a, const N: usize> Producer<'a, N> { }) } - /// Async version of [Self::grant_exact] - pub fn grant_exact_async(&'_ mut self, sz: usize) -> GrantExactFuture<'a, '_, N> { - GrantExactFuture { prod: self, sz } - } - /// Async version of [Self::grant_max_remaining] pub fn grant_max_remaining_async( &'_ mut self, @@ -881,7 +876,9 @@ impl<'a, const N: usize> GrantW<'a, N> { // Allow subsequent grants inner.write_in_progress.store(false, Release); - unsafe { self.bbq.as_mut().read_waker.wake() }; + unsafe { + self.bbq.as_mut().read_waker.wake(); // Notify new data + }; } /// Configures the amount of bytes to be commited on drop. @@ -988,7 +985,9 @@ impl<'a, const N: usize> GrantR<'a, N> { let _ = atomic::fetch_add(&inner.read, used, Release); inner.read_in_progress.store(false, Release); - unsafe { self.bbq.as_mut().write_waker.wake() }; + unsafe { + self.bbq.as_mut().write_waker.wake(); // Notify new free space + }; } /// Configures the amount of bytes to be released on drop. @@ -1137,31 +1136,6 @@ impl<'a, const N: usize> DerefMut for GrantR<'a, N> { } } -/// Future returned [Producer::grant_exact_async] -pub struct GrantExactFuture<'a, 'b, const N: usize> { - prod: &'b mut Producer<'a, N>, - sz: usize, -} - -impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let sz = self.sz; - - match self.prod.grant_exact(sz) { - Ok(grant) => Poll::Ready(Ok(grant)), - Err(e) => match e { - Error::GrantInProgress => { - unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; - Poll::Pending - } - _ => Poll::Ready(Err(e)), - }, - } - } -} - /// Future returned [Producer::grant_max_remaining_async] pub struct GrantMaxRemainingFuture<'a, 'b, const N: usize> { prod: &'b mut Producer<'a, N>, @@ -1177,7 +1151,7 @@ impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { match self.prod.grant_max_remaining(sz) { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::GrantInProgress | Error::InsufficientSize => { + Error::InsufficientSize => { unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; Poll::Pending } @@ -1199,7 +1173,7 @@ impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { match self.cons.read() { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::InsufficientSize | Error::GrantInProgress => { + Error::InsufficientSize => { unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; Poll::Pending } @@ -1221,7 +1195,7 @@ impl<'a, 'b, const N: usize> Future for GrantSplitReadFuture<'a, 'b, N> { match self.cons.split_read() { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::InsufficientSize | Error::GrantInProgress => { + Error::InsufficientSize => { unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; Poll::Pending } From 00ed22de8613c152e9c6e4b49e646130fe1ebb0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Mon, 31 Oct 2022 08:46:56 -0400 Subject: [PATCH 07/13] Revert "Remove unsound grant exact async api" This reverts commit 91362d6c9cec6aa9a52697a9ed73859171f08cbf. --- bbqtest/src/async_usage.rs | 6 +++--- core/src/bbbuffer.rs | 44 ++++++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs index 7629938..868ee1f 100644 --- a/bbqtest/src/async_usage.rs +++ b/bbqtest/src/async_usage.rs @@ -9,7 +9,7 @@ mod tests { let (mut prod, mut cons) = bb.try_split().unwrap(); { - let mut grant = prod.grant_max_remaining(4).unwrap(); + let mut grant = prod.grant_exact(4).unwrap(); let buf = grant.buf(); buf[0] = 0xDE; buf[1] = 0xAD; @@ -32,7 +32,7 @@ mod tests { let bb: BBBuffer<6> = BBBuffer::new(); let (mut prod, mut cons) = bb.try_split().unwrap(); - let mut w_grant = block_on(prod.grant_max_remaining_async(4)).unwrap(); + let mut w_grant = block_on(prod.grant_exact_async(4)).unwrap(); assert_eq!(4, w_grant.len()); w_grant[0] = 0xDE; w_grant[1] = 0xAD; @@ -64,7 +64,7 @@ mod tests { }; let write_fut = async { - let mut w_grant = prod.grant_max_remaining_async(4).await.unwrap(); + let mut w_grant = prod.grant_exact_async(4).await.unwrap(); w_grant[0] = 0xDE; w_grant[1] = 0xAD; w_grant[2] = 0xC0; diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 02addb1..b1d52b6 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -525,6 +525,11 @@ impl<'a, const N: usize> Producer<'a, N> { }) } + /// Async version of [Self::grant_exact] + pub fn grant_exact_async(&'_ mut self, sz: usize) -> GrantExactFuture<'a, '_, N> { + GrantExactFuture { prod: self, sz } + } + /// Async version of [Self::grant_max_remaining] pub fn grant_max_remaining_async( &'_ mut self, @@ -876,9 +881,7 @@ impl<'a, const N: usize> GrantW<'a, N> { // Allow subsequent grants inner.write_in_progress.store(false, Release); - unsafe { - self.bbq.as_mut().read_waker.wake(); // Notify new data - }; + unsafe { self.bbq.as_mut().read_waker.wake() }; } /// Configures the amount of bytes to be commited on drop. @@ -985,9 +988,7 @@ impl<'a, const N: usize> GrantR<'a, N> { let _ = atomic::fetch_add(&inner.read, used, Release); inner.read_in_progress.store(false, Release); - unsafe { - self.bbq.as_mut().write_waker.wake(); // Notify new free space - }; + unsafe { self.bbq.as_mut().write_waker.wake() }; } /// Configures the amount of bytes to be released on drop. @@ -1136,6 +1137,31 @@ impl<'a, const N: usize> DerefMut for GrantR<'a, N> { } } +/// Future returned [Producer::grant_exact_async] +pub struct GrantExactFuture<'a, 'b, const N: usize> { + prod: &'b mut Producer<'a, N>, + sz: usize, +} + +impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let sz = self.sz; + + match self.prod.grant_exact(sz) { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::GrantInProgress => { + unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + /// Future returned [Producer::grant_max_remaining_async] pub struct GrantMaxRemainingFuture<'a, 'b, const N: usize> { prod: &'b mut Producer<'a, N>, @@ -1151,7 +1177,7 @@ impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { match self.prod.grant_max_remaining(sz) { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::InsufficientSize => { + Error::GrantInProgress | Error::InsufficientSize => { unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; Poll::Pending } @@ -1173,7 +1199,7 @@ impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { match self.cons.read() { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::InsufficientSize => { + Error::InsufficientSize | Error::GrantInProgress => { unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; Poll::Pending } @@ -1195,7 +1221,7 @@ impl<'a, 'b, const N: usize> Future for GrantSplitReadFuture<'a, 'b, N> { match self.cons.split_read() { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::InsufficientSize => { + Error::InsufficientSize | Error::GrantInProgress => { unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; Poll::Pending } From 6c6ed17aca93d4ae382014ecaa3bdae13b5b1907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Mon, 31 Oct 2022 09:58:17 -0400 Subject: [PATCH 08/13] Added frame api --- core/src/bbbuffer.rs | 8 ++++++-- core/src/framed.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index b1d52b6..85b1489 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -1147,6 +1147,10 @@ impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.sz > N { + return Poll::Ready(Err(Error::InsufficientSize)); + } + let sz = self.sz; match self.prod.grant_exact(sz) { @@ -1187,7 +1191,7 @@ impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { } } -/// Future returned [Consumer::read] +/// Future returned [Consumer::read_async] pub struct GrantReadFuture<'a, 'b, const N: usize> { cons: &'b mut Consumer<'a, N>, } @@ -1209,7 +1213,7 @@ impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { } } -/// Future returned [Consumer::split_read] +/// Future returned [Consumer::split_read_async] pub struct GrantSplitReadFuture<'a, 'b, const N: usize> { cons: &'b mut Consumer<'a, N>, } diff --git a/core/src/framed.rs b/core/src/framed.rs index f0d4449..4855603 100644 --- a/core/src/framed.rs +++ b/core/src/framed.rs @@ -99,6 +99,15 @@ impl<'a, const N: usize> FrameProducer<'a, N> { hdr_len: hdr_len as u8, }) } + + /// Async version of [Self::grant] + pub async fn grant_async(&mut self, max_sz: usize) -> Result> { + let hdr_len = encoded_len(max_sz); + Ok(FrameGrantW { + grant_w: self.producer.grant_exact_async(max_sz + hdr_len).await?, + hdr_len: hdr_len as u8, + }) + } } /// A consumer of Framed data @@ -132,6 +141,32 @@ impl<'a, const N: usize> FrameConsumer<'a, N> { Some(FrameGrantR { grant_r, hdr_len }) } + + /// Async version of [Self::read] + pub async fn read_async(&mut self) -> Result> { + // Get all available bytes. We never wrap a frame around, + // so if a header is available, the whole frame will be. + let mut grant_r = self.consumer.read_async().await?; + + // Additionally, we never commit less than a full frame with + // a header, so if we have ANY data, we'll have a full header + // and frame. `Consumer::read` will return an Error when + // there are 0 bytes available. + + // The header consists of a single usize, encoded in native + // endianess order + let frame_len = decode_usize(&grant_r); + let hdr_len = decoded_len(grant_r[0]); + let total_len = frame_len + hdr_len; + let hdr_len = hdr_len as u8; + + debug_assert!(grant_r.len() >= total_len); + + // Reduce the grant down to the size of the frame with a header + grant_r.shrink(total_len); + + Ok(FrameGrantR { grant_r, hdr_len }) + } } /// A write grant for a single frame From 4ef40593338efd93d0dc86df38b8e3b9407c4504 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Mon, 31 Oct 2022 10:40:15 -0400 Subject: [PATCH 09/13] remove dep on embassy and added frame test --- bbqtest/src/async_framed.rs | 175 ++++++++++++++++++++++++++++++++++++ bbqtest/src/lib.rs | 1 + core/Cargo.toml | 1 - core/src/bbbuffer.rs | 50 ++++++++--- 4 files changed, 214 insertions(+), 13 deletions(-) create mode 100644 bbqtest/src/async_framed.rs diff --git a/bbqtest/src/async_framed.rs b/bbqtest/src/async_framed.rs new file mode 100644 index 0000000..fb35aef --- /dev/null +++ b/bbqtest/src/async_framed.rs @@ -0,0 +1,175 @@ +#[cfg(test)] +mod tests { + + use bbqueue::BBBuffer; + use futures::executor::block_on; + + #[test] + fn frame_wrong_size() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + // Create largeish grants + let mut wgr = prod.grant(127).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + // Note: In debug mode, this hits a debug_assert + wgr.commit(256); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 127); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + }); + } + + #[test] + fn full_size() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + let mut ctr = 0; + + for _ in 0..10_000 { + // Create largeish grants + if let Ok(mut wgr) = prod.grant(127) { + ctr += 1; + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(127); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 127); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + } else { + // Create smallish grants + let mut wgr = prod.grant(1).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(1); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 1); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + }; + } + + assert!(ctr > 1); + }); + } + + #[test] + fn frame_overcommit() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + // Create largeish grants + let mut wgr = prod.grant(128).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(255); + + let mut wgr = prod.grant(64).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 128; + } + wgr.commit(127); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 128); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 64); + rgr.release(); + }); + } + + #[test] + fn frame_undercommit() { + block_on(async { + let bb: BBBuffer<512> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + for _ in 0..100_000 { + // Create largeish grants + let mut wgr = prod.grant(128).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(13); + + let mut wgr = prod.grant(64).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 128; + } + wgr.commit(7); + + let mut wgr = prod.grant(32).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 192; + } + wgr.commit(0); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 13); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 7); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 0); + rgr.release(); + } + }); + } + + #[test] + fn frame_auto_commit_release() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + for _ in 0..100 { + { + let mut wgr = prod.grant(64).unwrap(); + wgr.to_commit(64); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + // drop + } + + { + let mut rgr = cons.read().unwrap(); + rgr.auto_release(true); + let rgr = rgr; + + for (i, by) in rgr.iter().enumerate() { + assert_eq!(*by, i as u8); + } + assert_eq!(rgr.len(), 64); + // drop + } + } + + assert!(cons.read().is_none()); + }); + } +} diff --git a/bbqtest/src/lib.rs b/bbqtest/src/lib.rs index 7cfbd1b..5ba5238 100644 --- a/bbqtest/src/lib.rs +++ b/bbqtest/src/lib.rs @@ -1,6 +1,7 @@ //! NOTE: this crate is really just a shim for testing //! the other no-std crate. +mod async_framed; mod async_usage; mod framed; mod multi_thread; diff --git a/core/Cargo.toml b/core/Cargo.toml index dabdbcd..c1f2044 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -16,7 +16,6 @@ license = "MIT OR Apache-2.0" [dependencies] cortex-m = { version = "0.6.0", optional = true } -embassy-sync = "0.1.0" [features] thumbv6 = ["cortex-m"] diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 85b1489..3780092 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -17,9 +17,26 @@ use core::{ AtomicBool, AtomicUsize, Ordering::{AcqRel, Acquire, Release}, }, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; -use embassy_sync::waitqueue::WakerRegistration; + +fn replace_waker(opt_w: &mut Option, new: &Waker) { + match opt_w.take() { + Some(prev) => { + // No need to clone if they wake the same task. + if prev.will_wake(&new) { + } + // Wake the previous waker and replace it + else { + prev.wake(); + opt_w.replace(new.clone()); + } + } + None => { + opt_w.replace(new.clone()); + } + } +} #[derive(Debug)] /// A backing structure for a BBQueue. Can be used to create either @@ -56,10 +73,10 @@ pub struct BBBuffer { already_split: AtomicBool, /// Read waker for async support - read_waker: WakerRegistration, + read_waker: Option, /// Write waker for async support - write_waker: WakerRegistration, + write_waker: Option, } unsafe impl Sync for BBBuffer {} @@ -290,10 +307,10 @@ impl BBBuffer { already_split: AtomicBool::new(false), /// Shared between reader and writer - read_waker: WakerRegistration::new(), + read_waker: None, /// Shared between reader and writer - write_waker: WakerRegistration::new(), + write_waker: None, } } } @@ -881,7 +898,12 @@ impl<'a, const N: usize> GrantW<'a, N> { // Allow subsequent grants inner.write_in_progress.store(false, Release); - unsafe { self.bbq.as_mut().read_waker.wake() }; + + unsafe { + if let Some(waker) = self.bbq.as_mut().read_waker.take() { + waker.wake() + } + }; } /// Configures the amount of bytes to be commited on drop. @@ -988,7 +1010,11 @@ impl<'a, const N: usize> GrantR<'a, N> { let _ = atomic::fetch_add(&inner.read, used, Release); inner.read_in_progress.store(false, Release); - unsafe { self.bbq.as_mut().write_waker.wake() }; + unsafe { + if let Some(waker) = self.bbq.as_mut().write_waker.take() { + waker.wake() + } + }; } /// Configures the amount of bytes to be released on drop. @@ -1157,7 +1183,7 @@ impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::GrantInProgress => { - unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; + unsafe { replace_waker(&mut self.prod.bbq.as_mut().write_waker, cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), @@ -1182,7 +1208,7 @@ impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::GrantInProgress | Error::InsufficientSize => { - unsafe { self.prod.bbq.as_mut().write_waker.register(cx.waker()) }; + unsafe { replace_waker(&mut self.prod.bbq.as_mut().write_waker, cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), @@ -1204,7 +1230,7 @@ impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::InsufficientSize | Error::GrantInProgress => { - unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; + unsafe { replace_waker(&mut self.cons.bbq.as_mut().write_waker, cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), @@ -1226,7 +1252,7 @@ impl<'a, 'b, const N: usize> Future for GrantSplitReadFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::InsufficientSize | Error::GrantInProgress => { - unsafe { self.cons.bbq.as_mut().read_waker.register(cx.waker()) }; + unsafe { replace_waker(&mut self.cons.bbq.as_mut().read_waker, cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), From 3f8e445976c6c39b74ceb48de3c98ef8130388ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xgroleau=20=F0=9F=90=A2?= Date: Tue, 1 Nov 2022 10:35:06 -0400 Subject: [PATCH 10/13] Added doc and tests --- bbqtest/src/async_usage.rs | 87 ++++++++++++++++++++++++++++++++++++++ core/src/bbbuffer.rs | 36 +++++++++++++--- 2 files changed, 116 insertions(+), 7 deletions(-) diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs index 868ee1f..cad1247 100644 --- a/bbqtest/src/async_usage.rs +++ b/bbqtest/src/async_usage.rs @@ -1,6 +1,7 @@ #[cfg(test)] mod tests { use bbqueue::BBBuffer; + use bbqueue::Error; use futures::{executor::block_on, future::join}; #[test] @@ -57,6 +58,7 @@ mod tests { let read_fut = async { let r_grant = cons.read_async().await.unwrap(); r_grant.release(4); + let time = std::time::Instant::now(); // TODO: Remove time dependence in test #[cfg(feature = "verbose")] println!("Read completed at {:?}", time); @@ -70,6 +72,7 @@ mod tests { w_grant[2] = 0xC0; w_grant[3] = 0xDE; w_grant.commit(4); + let time = std::time::Instant::now(); // TODO: Remove time dependence in test #[cfg(feature = "verbose")] println!("Write completed at {:?}", time); @@ -79,4 +82,88 @@ mod tests { let (r_time, w_time) = block_on(join(read_fut, write_fut)); assert!(r_time > w_time) } + + #[test] + fn grant_exact_too_big() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut _cons) = bb.try_split().unwrap(); + let w_grant_res = block_on(async { prod.grant_exact_async(8).await }); + + assert_eq!(w_grant_res.unwrap_err(), Error::InsufficientSize); + } + + #[test] + fn grant_exact_loop() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant = prod.grant_exact(4).unwrap(); + w_grant.commit(4); + + let read_fut = async { + let r_grant = cons.read_async().await.unwrap(); + r_grant.release(4); + + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Read completed at {:?}", time); + time + }; + + let write_fut = async { + let w_grant = prod.grant_exact_async(3).await.unwrap(); + w_grant.commit(4); + + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Write completed at {:?}", time); + time + }; + + let (w_time, r_time) = block_on(join(write_fut, read_fut)); + assert!(r_time < w_time); + } + + #[test] + fn grant_exact_loop_too_big() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant = prod.grant_exact(4).unwrap(); + w_grant.commit(4); + + let read_fut = async { + let r_grant = cons.read_async().await.unwrap(); + r_grant.release(4); + }; + + let write_fut = async { + let w_grant = prod.grant_exact_async(4).await; + assert_eq!(w_grant.unwrap_err(), Error::InsufficientSize); + }; + + block_on(join(write_fut, read_fut)); + } + + #[test] + fn write_cancelled() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant_fut = prod.grant_exact_async(6); + drop(w_grant_fut); + let r_grant = cons.read(); + assert_eq!(r_grant.unwrap_err(), Error::InsufficientSize); + } + + #[test] + fn read_cancelled() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant = prod.grant_exact(6).unwrap(); + w_grant.commit(6); + + let r_grant_fut = cons.read_async(); + drop(r_grant_fut); + + let w_grant = prod.grant_max_remaining(4); + assert_eq!(w_grant.unwrap_err(), Error::InsufficientSize); + } } diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 3780092..36bd175 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -542,12 +542,23 @@ impl<'a, const N: usize> Producer<'a, N> { }) } - /// Async version of [Self::grant_exact] + /// Async version of [Self::grant_exact]. + /// If the buffer can enventually provide a buffer of the requested size, the future + /// will wait for the buffer to be read so the exact buffer can be requested. + /// + /// If it's not possible to request it, an error is returned. + /// For example, given a buffer + /// [0|1|2|3|4|5|6|7|8] + /// ^ + /// Write pointer + /// We cannot request a size of size 7, since we would loop over the read pointer + /// even if the buffer is empty. In this case, an error is returned pub fn grant_exact_async(&'_ mut self, sz: usize) -> GrantExactFuture<'a, '_, N> { GrantExactFuture { prod: self, sz } } - /// Async version of [Self::grant_max_remaining] + /// Async version of [Self::grant_max_remaining]. + /// Will wait for the buffer to at least 1 byte available, as soon as it does, return the grant. pub fn grant_max_remaining_async( &'_ mut self, sz: usize, @@ -700,12 +711,14 @@ impl<'a, const N: usize> Consumer<'a, N> { }) } - /// Async version of [Self::read] + /// Async version of [Self::read]. + /// Will wait for the buffer to have data to read. When data is available, the grant is returned. pub fn read_async<'b>(&'b mut self) -> GrantReadFuture<'a, 'b, N> { GrantReadFuture { cons: self } } - /// Async version of [Self::split_read] + /// Async version of [Self::split_read]. + /// Will wait just like [Self::read_async], but returns the split grant to obtain all the available data. pub fn split_read_async<'b>(&'b mut self) -> GrantSplitReadFuture<'a, 'b, N> { GrantSplitReadFuture { cons: self } } @@ -1173,7 +1186,16 @@ impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.sz > N { + // Check if it's event possible to get the requested size + // Ex: + // [0|1|2|3|4|5|6|7|8] + // ^ + // Write pointer + // Check if the buffer from 6 to 8 satisfies or if the buffer from 0 to 5 does. + // If so, create the future, if not, we need the return since the future will never resolve. + // Ideally, we could just wait for all the read to complete and reset the read and write to 0, but that is currently not supported + let write = unsafe { self.prod.bbq.as_ref().write.load(Acquire) }; + if self.sz > N || (self.sz > N - write && self.sz >= write) { return Poll::Ready(Err(Error::InsufficientSize)); } @@ -1182,7 +1204,7 @@ impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { match self.prod.grant_exact(sz) { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { - Error::GrantInProgress => { + Error::GrantInProgress | Error::InsufficientSize => { unsafe { replace_waker(&mut self.prod.bbq.as_mut().write_waker, cx.waker()) }; Poll::Pending } @@ -1230,7 +1252,7 @@ impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::InsufficientSize | Error::GrantInProgress => { - unsafe { replace_waker(&mut self.cons.bbq.as_mut().write_waker, cx.waker()) }; + unsafe { replace_waker(&mut self.cons.bbq.as_mut().read_waker, cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), From 170e2d35078526c68acc5a06a19e748575df05d1 Mon Sep 17 00:00:00 2001 From: xgroleau Date: Sat, 5 Nov 2022 18:18:12 -0400 Subject: [PATCH 11/13] Wrapped waker in a struct --- core/src/bbbuffer.rs | 51 +++++++++++++------------------------------- core/src/lib.rs | 1 + core/src/waker.rs | 40 ++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 36 deletions(-) create mode 100644 core/src/waker.rs diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 36bd175..5276b3b 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -1,5 +1,6 @@ use crate::{ framed::{FrameConsumer, FrameProducer}, + waker::WakerStorage, Error, Result, }; use core::{ @@ -17,27 +18,9 @@ use core::{ AtomicBool, AtomicUsize, Ordering::{AcqRel, Acquire, Release}, }, - task::{Context, Poll, Waker}, + task::{Context, Poll}, }; -fn replace_waker(opt_w: &mut Option, new: &Waker) { - match opt_w.take() { - Some(prev) => { - // No need to clone if they wake the same task. - if prev.will_wake(&new) { - } - // Wake the previous waker and replace it - else { - prev.wake(); - opt_w.replace(new.clone()); - } - } - None => { - opt_w.replace(new.clone()); - } - } -} - #[derive(Debug)] /// A backing structure for a BBQueue. Can be used to create either /// a BBQueue or a split Producer/Consumer pair @@ -73,10 +56,12 @@ pub struct BBBuffer { already_split: AtomicBool, /// Read waker for async support - read_waker: Option, + /// Woken up when a commit is done + read_waker: WakerStorage, /// Write waker for async support - write_waker: Option, + /// Woken up when a release is done + write_waker: WakerStorage, } unsafe impl Sync for BBBuffer {} @@ -306,11 +291,11 @@ impl BBBuffer { /// We haven't split at the start already_split: AtomicBool::new(false), - /// Shared between reader and writer - read_waker: None, + /// Shared between reader and writer. + read_waker: WakerStorage::new(), /// Shared between reader and writer - write_waker: None, + write_waker: WakerStorage::new(), } } } @@ -913,9 +898,7 @@ impl<'a, const N: usize> GrantW<'a, N> { inner.write_in_progress.store(false, Release); unsafe { - if let Some(waker) = self.bbq.as_mut().read_waker.take() { - waker.wake() - } + self.bbq.as_mut().read_waker.wake(); }; } @@ -1023,11 +1006,7 @@ impl<'a, const N: usize> GrantR<'a, N> { let _ = atomic::fetch_add(&inner.read, used, Release); inner.read_in_progress.store(false, Release); - unsafe { - if let Some(waker) = self.bbq.as_mut().write_waker.take() { - waker.wake() - } - }; + unsafe { self.bbq.as_mut().write_waker.wake() }; } /// Configures the amount of bytes to be released on drop. @@ -1205,7 +1184,7 @@ impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::GrantInProgress | Error::InsufficientSize => { - unsafe { replace_waker(&mut self.prod.bbq.as_mut().write_waker, cx.waker()) }; + unsafe { self.prod.bbq.as_mut().write_waker.set(cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), @@ -1230,7 +1209,7 @@ impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::GrantInProgress | Error::InsufficientSize => { - unsafe { replace_waker(&mut self.prod.bbq.as_mut().write_waker, cx.waker()) }; + unsafe { self.prod.bbq.as_mut().write_waker.set(cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), @@ -1252,7 +1231,7 @@ impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::InsufficientSize | Error::GrantInProgress => { - unsafe { replace_waker(&mut self.cons.bbq.as_mut().read_waker, cx.waker()) }; + unsafe { self.cons.bbq.as_mut().read_waker.set(cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), @@ -1274,7 +1253,7 @@ impl<'a, 'b, const N: usize> Future for GrantSplitReadFuture<'a, 'b, N> { Ok(grant) => Poll::Ready(Ok(grant)), Err(e) => match e { Error::InsufficientSize | Error::GrantInProgress => { - unsafe { replace_waker(&mut self.cons.bbq.as_mut().read_waker, cx.waker()) }; + unsafe { self.cons.bbq.as_mut().read_waker.set(cx.waker()) }; Poll::Pending } _ => Poll::Ready(Err(e)), diff --git a/core/src/lib.rs b/core/src/lib.rs index ea2348d..4850442 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -104,6 +104,7 @@ pub use bbbuffer::*; pub mod framed; mod vusize; +mod waker; use core::result::Result as CoreResult; diff --git a/core/src/waker.rs b/core/src/waker.rs new file mode 100644 index 0000000..78b773d --- /dev/null +++ b/core/src/waker.rs @@ -0,0 +1,40 @@ +use core::task::Waker; + +/// A waker storage. Can be initialized without a waker, and a waker can be set on an eventual `poll` call. +/// The waker can be set and woken up. +#[derive(Debug)] +pub struct WakerStorage { + waker: Option, +} + +impl WakerStorage { + pub const fn new() -> Self { + WakerStorage { waker: None } + } + + /// Set the waker, will wake the previous one if one was already stored. + pub fn set(&mut self, new: &Waker) { + match self.waker.take() { + Some(prev) => { + // No need to clone if they wake the same task. + if prev.will_wake(new) { + } + // Wake the previous waker and replace it + else { + prev.wake(); + self.waker.replace(new.clone()); + } + } + None => { + self.waker.replace(new.clone()); + } + } + } + + /// Wake the waker if one is available + pub fn wake(&mut self) { + if let Some(waker) = self.waker.take() { + waker.wake() + } + } +} From 34a7d4b4064519c5837a0e4a8ff264dae63c4ac9 Mon Sep 17 00:00:00 2001 From: xgroleau Date: Sat, 5 Nov 2022 18:20:44 -0400 Subject: [PATCH 12/13] ran fmt --- bbqtest/src/async_framed.rs | 236 ++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 118 deletions(-) diff --git a/bbqtest/src/async_framed.rs b/bbqtest/src/async_framed.rs index fb35aef..e2439d6 100644 --- a/bbqtest/src/async_framed.rs +++ b/bbqtest/src/async_framed.rs @@ -7,169 +7,169 @@ mod tests { #[test] fn frame_wrong_size() { block_on(async { - let bb: BBBuffer<256> = BBBuffer::new(); - let (mut prod, mut cons) = bb.try_split_framed().unwrap(); - - // Create largeish grants - let mut wgr = prod.grant(127).unwrap(); - for (i, by) in wgr.iter_mut().enumerate() { - *by = i as u8; - } - // Note: In debug mode, this hits a debug_assert - wgr.commit(256); - - let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 127); - for (i, by) in rgr.iter().enumerate() { - assert_eq!((i as u8), *by); - } - rgr.release(); - }); - } - - #[test] - fn full_size() { - block_on(async { - let bb: BBBuffer<256> = BBBuffer::new(); - let (mut prod, mut cons) = bb.try_split_framed().unwrap(); - let mut ctr = 0; + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); - for _ in 0..10_000 { // Create largeish grants - if let Ok(mut wgr) = prod.grant(127) { - ctr += 1; - for (i, by) in wgr.iter_mut().enumerate() { - *by = i as u8; - } - wgr.commit(127); - - let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 127); - for (i, by) in rgr.iter().enumerate() { - assert_eq!((i as u8), *by); - } - rgr.release(); - } else { - // Create smallish grants - let mut wgr = prod.grant(1).unwrap(); - for (i, by) in wgr.iter_mut().enumerate() { - *by = i as u8; - } - wgr.commit(1); - - let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 1); - for (i, by) in rgr.iter().enumerate() { - assert_eq!((i as u8), *by); - } - rgr.release(); - }; - } + let mut wgr = prod.grant(127).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + // Note: In debug mode, this hits a debug_assert + wgr.commit(256); - assert!(ctr > 1); + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 127); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); }); } #[test] - fn frame_overcommit() { + fn full_size() { block_on(async { - let bb: BBBuffer<256> = BBBuffer::new(); - let (mut prod, mut cons) = bb.try_split_framed().unwrap(); - - // Create largeish grants - let mut wgr = prod.grant(128).unwrap(); - for (i, by) in wgr.iter_mut().enumerate() { - *by = i as u8; - } - wgr.commit(255); - - let mut wgr = prod.grant(64).unwrap(); - for (i, by) in wgr.iter_mut().enumerate() { - *by = (i as u8) + 128; - } - wgr.commit(127); - - let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 128); - rgr.release(); - - let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 64); - rgr.release(); + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + let mut ctr = 0; + + for _ in 0..10_000 { + // Create largeish grants + if let Ok(mut wgr) = prod.grant(127) { + ctr += 1; + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(127); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 127); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + } else { + // Create smallish grants + let mut wgr = prod.grant(1).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(1); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 1); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + }; + } + + assert!(ctr > 1); }); } #[test] - fn frame_undercommit() { + fn frame_overcommit() { block_on(async { - let bb: BBBuffer<512> = BBBuffer::new(); - let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); - for _ in 0..100_000 { // Create largeish grants let mut wgr = prod.grant(128).unwrap(); for (i, by) in wgr.iter_mut().enumerate() { *by = i as u8; } - wgr.commit(13); + wgr.commit(255); let mut wgr = prod.grant(64).unwrap(); for (i, by) in wgr.iter_mut().enumerate() { *by = (i as u8) + 128; } - wgr.commit(7); - - let mut wgr = prod.grant(32).unwrap(); - for (i, by) in wgr.iter_mut().enumerate() { - *by = (i as u8) + 192; - } - wgr.commit(0); - - let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 13); - rgr.release(); + wgr.commit(127); let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 7); + assert_eq!(rgr.len(), 128); rgr.release(); let rgr = cons.read().unwrap(); - assert_eq!(rgr.len(), 0); + assert_eq!(rgr.len(), 64); rgr.release(); - } }); } #[test] - fn frame_auto_commit_release() { + fn frame_undercommit() { block_on(async { - let bb: BBBuffer<256> = BBBuffer::new(); - let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + let bb: BBBuffer<512> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); - for _ in 0..100 { - { - let mut wgr = prod.grant(64).unwrap(); - wgr.to_commit(64); + for _ in 0..100_000 { + // Create largeish grants + let mut wgr = prod.grant(128).unwrap(); for (i, by) in wgr.iter_mut().enumerate() { *by = i as u8; } - // drop + wgr.commit(13); + + let mut wgr = prod.grant(64).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 128; + } + wgr.commit(7); + + let mut wgr = prod.grant(32).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 192; + } + wgr.commit(0); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 13); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 7); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 0); + rgr.release(); } + }); + } + + #[test] + fn frame_auto_commit_release() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + for _ in 0..100 { + { + let mut wgr = prod.grant(64).unwrap(); + wgr.to_commit(64); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + // drop + } - { - let mut rgr = cons.read().unwrap(); - rgr.auto_release(true); - let rgr = rgr; + { + let mut rgr = cons.read().unwrap(); + rgr.auto_release(true); + let rgr = rgr; - for (i, by) in rgr.iter().enumerate() { - assert_eq!(*by, i as u8); + for (i, by) in rgr.iter().enumerate() { + assert_eq!(*by, i as u8); + } + assert_eq!(rgr.len(), 64); + // drop } - assert_eq!(rgr.len(), 64); - // drop } - } - assert!(cons.read().is_none()); + assert!(cons.read().is_none()); }); } } From 2a721183c12018d26b5e2a4aa81da073ff35cb86 Mon Sep 17 00:00:00 2001 From: xgroleau Date: Sat, 5 Nov 2022 18:23:26 -0400 Subject: [PATCH 13/13] Added explicit return --- core/src/waker.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/waker.rs b/core/src/waker.rs index 78b773d..538bbb8 100644 --- a/core/src/waker.rs +++ b/core/src/waker.rs @@ -18,6 +18,7 @@ impl WakerStorage { Some(prev) => { // No need to clone if they wake the same task. if prev.will_wake(new) { + return; } // Wake the previous waker and replace it else {