diff --git a/bbqtest/Cargo.toml b/bbqtest/Cargo.toml index 6275e0b..8d82082 100644 --- a/bbqtest/Cargo.toml +++ b/bbqtest/Cargo.toml @@ -19,6 +19,8 @@ crossbeam-utils = "0.7" crossbeam = "0.7" heapless = "0.7" cfg-if = "0.1" +futures = "0.3" + [[bench]] name = "benches" diff --git a/bbqtest/src/async_framed.rs b/bbqtest/src/async_framed.rs new file mode 100644 index 0000000..e2439d6 --- /dev/null +++ b/bbqtest/src/async_framed.rs @@ -0,0 +1,175 @@ +#[cfg(test)] +mod tests { + + use bbqueue::BBBuffer; + use futures::executor::block_on; + + #[test] + fn frame_wrong_size() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + // Create largeish grants + let mut wgr = prod.grant(127).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + // Note: In debug mode, this hits a debug_assert + wgr.commit(256); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 127); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + }); + } + + #[test] + fn full_size() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + let mut ctr = 0; + + for _ in 0..10_000 { + // Create largeish grants + if let Ok(mut wgr) = prod.grant(127) { + ctr += 1; + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(127); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 127); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + } else { + // Create smallish grants + let mut wgr = prod.grant(1).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(1); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 1); + for (i, by) in rgr.iter().enumerate() { + assert_eq!((i as u8), *by); + } + rgr.release(); + }; + } + + assert!(ctr > 1); + }); + } + + #[test] + fn frame_overcommit() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + // Create largeish grants + let mut wgr = prod.grant(128).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(255); + + let mut wgr = prod.grant(64).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 128; + } + wgr.commit(127); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 128); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 64); + rgr.release(); + }); + } + + #[test] + fn frame_undercommit() { + block_on(async { + let bb: BBBuffer<512> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + for _ in 0..100_000 { + // Create largeish grants + let mut wgr = prod.grant(128).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + wgr.commit(13); + + let mut wgr = prod.grant(64).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 128; + } + wgr.commit(7); + + let mut wgr = prod.grant(32).unwrap(); + for (i, by) in wgr.iter_mut().enumerate() { + *by = (i as u8) + 192; + } + wgr.commit(0); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 13); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 7); + rgr.release(); + + let rgr = cons.read().unwrap(); + assert_eq!(rgr.len(), 0); + rgr.release(); + } + }); + } + + #[test] + fn frame_auto_commit_release() { + block_on(async { + let bb: BBBuffer<256> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split_framed().unwrap(); + + for _ in 0..100 { + { + let mut wgr = prod.grant(64).unwrap(); + wgr.to_commit(64); + for (i, by) in wgr.iter_mut().enumerate() { + *by = i as u8; + } + // drop + } + + { + let mut rgr = cons.read().unwrap(); + rgr.auto_release(true); + let rgr = rgr; + + for (i, by) in rgr.iter().enumerate() { + assert_eq!(*by, i as u8); + } + assert_eq!(rgr.len(), 64); + // drop + } + } + + assert!(cons.read().is_none()); + }); + } +} diff --git a/bbqtest/src/async_usage.rs b/bbqtest/src/async_usage.rs new file mode 100644 index 0000000..cad1247 --- /dev/null +++ b/bbqtest/src/async_usage.rs @@ -0,0 +1,169 @@ +#[cfg(test)] +mod tests { + use bbqueue::BBBuffer; + use bbqueue::Error; + use futures::{executor::block_on, future::join}; + + #[test] + fn test_read() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + + { + let mut grant = prod.grant_exact(4).unwrap(); + let buf = grant.buf(); + buf[0] = 0xDE; + buf[1] = 0xAD; + buf[2] = 0xC0; + buf[3] = 0xDE; + grant.commit(4); + } + + let r_grant = block_on(cons.read_async()).unwrap(); + + assert_eq!(4, r_grant.len()); + assert_eq!(r_grant[0], 0xDE); + assert_eq!(r_grant[1], 0xAD); + assert_eq!(r_grant[2], 0xC0); + assert_eq!(r_grant[3], 0xDE); + } + + #[test] + fn test_write() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + + let mut w_grant = block_on(prod.grant_exact_async(4)).unwrap(); + assert_eq!(4, w_grant.len()); + w_grant[0] = 0xDE; + w_grant[1] = 0xAD; + w_grant[2] = 0xC0; + w_grant[3] = 0xDE; + w_grant.commit(4); + + let grant = cons.read().unwrap(); + let rx_buf = grant.buf(); + assert_eq!(4, rx_buf.len()); + assert_eq!(rx_buf[0], 0xDE); + assert_eq!(rx_buf[1], 0xAD); + assert_eq!(rx_buf[2], 0xC0); + assert_eq!(rx_buf[3], 0xDE); + } + + #[test] + fn test_read_after_write() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + + let read_fut = async { + let r_grant = cons.read_async().await.unwrap(); + r_grant.release(4); + + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Read completed at {:?}", time); + time + }; + + let write_fut = async { + let mut w_grant = prod.grant_exact_async(4).await.unwrap(); + w_grant[0] = 0xDE; + w_grant[1] = 0xAD; + w_grant[2] = 0xC0; + w_grant[3] = 0xDE; + w_grant.commit(4); + + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Write completed at {:?}", time); + time + }; + + let (r_time, w_time) = block_on(join(read_fut, write_fut)); + assert!(r_time > w_time) + } + + #[test] + fn grant_exact_too_big() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut _cons) = bb.try_split().unwrap(); + let w_grant_res = block_on(async { prod.grant_exact_async(8).await }); + + assert_eq!(w_grant_res.unwrap_err(), Error::InsufficientSize); + } + + #[test] + fn grant_exact_loop() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant = prod.grant_exact(4).unwrap(); + w_grant.commit(4); + + let read_fut = async { + let r_grant = cons.read_async().await.unwrap(); + r_grant.release(4); + + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Read completed at {:?}", time); + time + }; + + let write_fut = async { + let w_grant = prod.grant_exact_async(3).await.unwrap(); + w_grant.commit(4); + + let time = std::time::Instant::now(); // TODO: Remove time dependence in test + #[cfg(feature = "verbose")] + println!("Write completed at {:?}", time); + time + }; + + let (w_time, r_time) = block_on(join(write_fut, read_fut)); + assert!(r_time < w_time); + } + + #[test] + fn grant_exact_loop_too_big() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant = prod.grant_exact(4).unwrap(); + w_grant.commit(4); + + let read_fut = async { + let r_grant = cons.read_async().await.unwrap(); + r_grant.release(4); + }; + + let write_fut = async { + let w_grant = prod.grant_exact_async(4).await; + assert_eq!(w_grant.unwrap_err(), Error::InsufficientSize); + }; + + block_on(join(write_fut, read_fut)); + } + + #[test] + fn write_cancelled() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant_fut = prod.grant_exact_async(6); + drop(w_grant_fut); + let r_grant = cons.read(); + assert_eq!(r_grant.unwrap_err(), Error::InsufficientSize); + } + + #[test] + fn read_cancelled() { + let bb: BBBuffer<6> = BBBuffer::new(); + let (mut prod, mut cons) = bb.try_split().unwrap(); + let w_grant = prod.grant_exact(6).unwrap(); + w_grant.commit(6); + + let r_grant_fut = cons.read_async(); + drop(r_grant_fut); + + let w_grant = prod.grant_max_remaining(4); + assert_eq!(w_grant.unwrap_err(), Error::InsufficientSize); + } +} diff --git a/bbqtest/src/lib.rs b/bbqtest/src/lib.rs index 5f1843e..5ba5238 100644 --- a/bbqtest/src/lib.rs +++ b/bbqtest/src/lib.rs @@ -1,6 +1,8 @@ //! NOTE: this crate is really just a shim for testing //! the other no-std crate. +mod async_framed; +mod async_usage; mod framed; mod multi_thread; mod ring_around_the_senders; diff --git a/core/src/bbbuffer.rs b/core/src/bbbuffer.rs index 1492a55..5276b3b 100644 --- a/core/src/bbbuffer.rs +++ b/core/src/bbbuffer.rs @@ -1,13 +1,16 @@ use crate::{ framed::{FrameConsumer, FrameProducer}, + waker::WakerStorage, Error, Result, }; use core::{ cell::UnsafeCell, cmp::min, + future::Future, marker::PhantomData, mem::{forget, transmute, MaybeUninit}, ops::{Deref, DerefMut}, + pin::Pin, ptr::NonNull, result::Result as CoreResult, slice::from_raw_parts_mut, @@ -15,7 +18,9 @@ use core::{ AtomicBool, AtomicUsize, Ordering::{AcqRel, Acquire, Release}, }, + task::{Context, Poll}, }; + #[derive(Debug)] /// A backing structure for a BBQueue. Can be used to create either /// a BBQueue or a split Producer/Consumer pair @@ -49,6 +54,14 @@ pub struct BBBuffer { /// Have we already split? already_split: AtomicBool, + + /// Read waker for async support + /// Woken up when a commit is done + read_waker: WakerStorage, + + /// Write waker for async support + /// Woken up when a release is done + write_waker: WakerStorage, } unsafe impl Sync for BBBuffer {} @@ -277,6 +290,12 @@ impl BBBuffer { /// We haven't split at the start already_split: AtomicBool::new(false), + + /// Shared between reader and writer. + read_waker: WakerStorage::new(), + + /// Shared between reader and writer + write_waker: WakerStorage::new(), } } } @@ -507,6 +526,30 @@ impl<'a, const N: usize> Producer<'a, N> { to_commit: 0, }) } + + /// Async version of [Self::grant_exact]. + /// If the buffer can enventually provide a buffer of the requested size, the future + /// will wait for the buffer to be read so the exact buffer can be requested. + /// + /// If it's not possible to request it, an error is returned. + /// For example, given a buffer + /// [0|1|2|3|4|5|6|7|8] + /// ^ + /// Write pointer + /// We cannot request a size of size 7, since we would loop over the read pointer + /// even if the buffer is empty. In this case, an error is returned + pub fn grant_exact_async(&'_ mut self, sz: usize) -> GrantExactFuture<'a, '_, N> { + GrantExactFuture { prod: self, sz } + } + + /// Async version of [Self::grant_max_remaining]. + /// Will wait for the buffer to at least 1 byte available, as soon as it does, return the grant. + pub fn grant_max_remaining_async( + &'_ mut self, + sz: usize, + ) -> GrantMaxRemainingFuture<'a, '_, N> { + GrantMaxRemainingFuture { prod: self, sz } + } } /// `Consumer` is the primary interface for reading data from a `BBBuffer`. @@ -652,6 +695,18 @@ impl<'a, const N: usize> Consumer<'a, N> { to_release: 0, }) } + + /// Async version of [Self::read]. + /// Will wait for the buffer to have data to read. When data is available, the grant is returned. + pub fn read_async<'b>(&'b mut self) -> GrantReadFuture<'a, 'b, N> { + GrantReadFuture { cons: self } + } + + /// Async version of [Self::split_read]. + /// Will wait just like [Self::read_async], but returns the split grant to obtain all the available data. + pub fn split_read_async<'b>(&'b mut self) -> GrantSplitReadFuture<'a, 'b, N> { + GrantSplitReadFuture { cons: self } + } } impl BBBuffer { @@ -841,6 +896,10 @@ impl<'a, const N: usize> GrantW<'a, N> { // Allow subsequent grants inner.write_in_progress.store(false, Release); + + unsafe { + self.bbq.as_mut().read_waker.wake(); + }; } /// Configures the amount of bytes to be commited on drop. @@ -947,6 +1006,7 @@ impl<'a, const N: usize> GrantR<'a, N> { let _ = atomic::fetch_add(&inner.read, used, Release); inner.read_in_progress.store(false, Release); + unsafe { self.bbq.as_mut().write_waker.wake() }; } /// Configures the amount of bytes to be released on drop. @@ -1095,6 +1155,113 @@ impl<'a, const N: usize> DerefMut for GrantR<'a, N> { } } +/// Future returned [Producer::grant_exact_async] +pub struct GrantExactFuture<'a, 'b, const N: usize> { + prod: &'b mut Producer<'a, N>, + sz: usize, +} + +impl<'a, 'b, const N: usize> Future for GrantExactFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Check if it's event possible to get the requested size + // Ex: + // [0|1|2|3|4|5|6|7|8] + // ^ + // Write pointer + // Check if the buffer from 6 to 8 satisfies or if the buffer from 0 to 5 does. + // If so, create the future, if not, we need the return since the future will never resolve. + // Ideally, we could just wait for all the read to complete and reset the read and write to 0, but that is currently not supported + let write = unsafe { self.prod.bbq.as_ref().write.load(Acquire) }; + if self.sz > N || (self.sz > N - write && self.sz >= write) { + return Poll::Ready(Err(Error::InsufficientSize)); + } + + let sz = self.sz; + + match self.prod.grant_exact(sz) { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::GrantInProgress | Error::InsufficientSize => { + unsafe { self.prod.bbq.as_mut().write_waker.set(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + +/// Future returned [Producer::grant_max_remaining_async] +pub struct GrantMaxRemainingFuture<'a, 'b, const N: usize> { + prod: &'b mut Producer<'a, N>, + sz: usize, +} + +impl<'a, 'b, const N: usize> Future for GrantMaxRemainingFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let sz = self.sz; + + match self.prod.grant_max_remaining(sz) { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::GrantInProgress | Error::InsufficientSize => { + unsafe { self.prod.bbq.as_mut().write_waker.set(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + +/// Future returned [Consumer::read_async] +pub struct GrantReadFuture<'a, 'b, const N: usize> { + cons: &'b mut Consumer<'a, N>, +} + +impl<'a, 'b, const N: usize> Future for GrantReadFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.cons.read() { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::InsufficientSize | Error::GrantInProgress => { + unsafe { self.cons.bbq.as_mut().read_waker.set(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + +/// Future returned [Consumer::split_read_async] +pub struct GrantSplitReadFuture<'a, 'b, const N: usize> { + cons: &'b mut Consumer<'a, N>, +} + +impl<'a, 'b, const N: usize> Future for GrantSplitReadFuture<'a, 'b, N> { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.cons.split_read() { + Ok(grant) => Poll::Ready(Ok(grant)), + Err(e) => match e { + Error::InsufficientSize | Error::GrantInProgress => { + unsafe { self.cons.bbq.as_mut().read_waker.set(cx.waker()) }; + Poll::Pending + } + _ => Poll::Ready(Err(e)), + }, + } + } +} + #[cfg(feature = "thumbv6")] mod atomic { use core::sync::atomic::{ diff --git a/core/src/framed.rs b/core/src/framed.rs index f0d4449..4855603 100644 --- a/core/src/framed.rs +++ b/core/src/framed.rs @@ -99,6 +99,15 @@ impl<'a, const N: usize> FrameProducer<'a, N> { hdr_len: hdr_len as u8, }) } + + /// Async version of [Self::grant] + pub async fn grant_async(&mut self, max_sz: usize) -> Result> { + let hdr_len = encoded_len(max_sz); + Ok(FrameGrantW { + grant_w: self.producer.grant_exact_async(max_sz + hdr_len).await?, + hdr_len: hdr_len as u8, + }) + } } /// A consumer of Framed data @@ -132,6 +141,32 @@ impl<'a, const N: usize> FrameConsumer<'a, N> { Some(FrameGrantR { grant_r, hdr_len }) } + + /// Async version of [Self::read] + pub async fn read_async(&mut self) -> Result> { + // Get all available bytes. We never wrap a frame around, + // so if a header is available, the whole frame will be. + let mut grant_r = self.consumer.read_async().await?; + + // Additionally, we never commit less than a full frame with + // a header, so if we have ANY data, we'll have a full header + // and frame. `Consumer::read` will return an Error when + // there are 0 bytes available. + + // The header consists of a single usize, encoded in native + // endianess order + let frame_len = decode_usize(&grant_r); + let hdr_len = decoded_len(grant_r[0]); + let total_len = frame_len + hdr_len; + let hdr_len = hdr_len as u8; + + debug_assert!(grant_r.len() >= total_len); + + // Reduce the grant down to the size of the frame with a header + grant_r.shrink(total_len); + + Ok(FrameGrantR { grant_r, hdr_len }) + } } /// A write grant for a single frame diff --git a/core/src/lib.rs b/core/src/lib.rs index ea2348d..4850442 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -104,6 +104,7 @@ pub use bbbuffer::*; pub mod framed; mod vusize; +mod waker; use core::result::Result as CoreResult; diff --git a/core/src/waker.rs b/core/src/waker.rs new file mode 100644 index 0000000..538bbb8 --- /dev/null +++ b/core/src/waker.rs @@ -0,0 +1,41 @@ +use core::task::Waker; + +/// A waker storage. Can be initialized without a waker, and a waker can be set on an eventual `poll` call. +/// The waker can be set and woken up. +#[derive(Debug)] +pub struct WakerStorage { + waker: Option, +} + +impl WakerStorage { + pub const fn new() -> Self { + WakerStorage { waker: None } + } + + /// Set the waker, will wake the previous one if one was already stored. + pub fn set(&mut self, new: &Waker) { + match self.waker.take() { + Some(prev) => { + // No need to clone if they wake the same task. + if prev.will_wake(new) { + return; + } + // Wake the previous waker and replace it + else { + prev.wake(); + self.waker.replace(new.clone()); + } + } + None => { + self.waker.replace(new.clone()); + } + } + } + + /// Wake the waker if one is available + pub fn wake(&mut self) { + if let Some(waker) = self.waker.take() { + waker.wake() + } + } +}