Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bbqtest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ crossbeam-utils = "0.7"
crossbeam = "0.7"
heapless = "0.5"
cfg-if = "0.1"
futures = "0.3"

[[bench]]
name = "benches"
Expand Down
47 changes: 47 additions & 0 deletions bbqtest/src/async_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#[cfg(test)]
mod tests {
use bbqueue::{consts::*, BBBuffer};
use futures::executor::block_on;

#[test]
fn test_read() {
let bb: BBBuffer<U6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();

{
let mut grant = prod.grant_exact(4).unwrap();
let buf = grant.buf();
buf[0] = 0xDE;
buf[1] = 0xAD;
buf[2] = 0xC0;
buf[3] = 0xDE;
grant.commit(4);
}

let mut rx_buf = [0; 4];
let result = block_on(cons.read_async(&mut rx_buf));

assert_eq!(4, result.unwrap());
assert_eq!(rx_buf[0], 0xDE);
assert_eq!(rx_buf[1], 0xAD);
assert_eq!(rx_buf[2], 0xC0);
assert_eq!(rx_buf[3], 0xDE);
}

#[test]
fn test_write() {
let bb: BBBuffer<U6> = BBBuffer::new();
let (mut prod, mut cons) = bb.try_split().unwrap();

let result = block_on(prod.write_async(&[0xDE, 0xAD, 0xC0, 0xDE]));
assert_eq!(4, result.unwrap());

let grant = cons.read().unwrap();
let rx_buf = grant.buf();
assert_eq!(4, rx_buf.len());
assert_eq!(rx_buf[0], 0xDE);
assert_eq!(rx_buf[1], 0xAD);
assert_eq!(rx_buf[2], 0xC0);
assert_eq!(rx_buf[3], 0xDE);
}
}
1 change: 1 addition & 0 deletions bbqtest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! NOTE: this crate is really just a shim for testing
//! the other no-std crate.

mod async_usage;
mod framed;
mod multi_thread;
mod ring_around_the_senders;
Expand Down
204 changes: 204 additions & 0 deletions core/src/bbbuffer.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use crate::{
framed::{FrameConsumer, FrameProducer},
signal::Signal,
Error, Result,
};
use core::{
cell::UnsafeCell,
cmp::min,
future::Future,
marker::PhantomData,
mem::{forget, transmute, MaybeUninit},
ops::{Deref, DerefMut},
pin::Pin,
ptr::NonNull,
result::Result as CoreResult,
slice::from_raw_parts_mut,
sync::atomic::{
AtomicBool, AtomicUsize,
Ordering::{AcqRel, Acquire, Release},
},
task::{Context, Poll},
};
pub use generic_array::typenum::consts;
use generic_array::{ArrayLength, GenericArray};
Expand Down Expand Up @@ -239,6 +243,12 @@ pub struct ConstBBBuffer<A> {

/// Have we already split?
already_split: AtomicBool,

/// Waker for async producer.
producer_waker: Signal<()>,

/// Waker for async consumer.
consumer_waker: Signal<()>,
}

impl<A> ConstBBBuffer<A> {
Expand Down Expand Up @@ -293,6 +303,12 @@ impl<A> ConstBBBuffer<A> {

/// We haven't split at the start
already_split: AtomicBool::new(false),

/// Consumer waker
consumer_waker: Signal::new(),

/// Producer waker
producer_waker: Signal::new(),
}
}
}
Expand Down Expand Up @@ -331,6 +347,17 @@ where

unsafe impl<'a, N> Send for Producer<'a, N> where N: ArrayLength<u8> {}

/// TODO: Documentation for struct
pub struct AsyncWrite<'a, N>
where
N: ArrayLength<u8>,
{
producer: &'a mut Producer<'a, N>,
buffer: &'a [u8],
remaining: usize,
cancelled: bool,
}

impl<'a, N> Producer<'a, N>
where
N: ArrayLength<u8>,
Expand Down Expand Up @@ -529,6 +556,90 @@ where
to_commit: 0,
})
}

/// Asynchronously write data and complete when write is finished. The buffer must outlive the future
/// that is returned.
pub fn write_async(&'a mut self, buffer: &'a [u8]) -> AsyncWrite<'a, N> {
AsyncWrite::new(self, buffer)
}

fn poll(&self, cx: &mut Context<'_>) -> Poll<()> {
let inner = unsafe { &self.bbq.as_ref().0 };
inner.producer_waker.poll_wait(cx)
}

fn notify_consumer(&self) {
let inner = unsafe { &self.bbq.as_ref().0 };
inner.consumer_waker.signal(())
}
}

impl<'a, N> AsyncWrite<'a, N>
where
N: ArrayLength<u8>,
{
fn new(producer: &'a mut Producer<'a, N>, buffer: &'a [u8]) -> AsyncWrite<'a, N> {
Self {
producer,
remaining: buffer.len(),
buffer,
cancelled: false,
}
}

/// Signal that this future is cancelled and should no longer poll data
pub fn cancel(&mut self) {
self.cancelled = true;
}
}

impl<'a, N> Future for AsyncWrite<'a, N>
where
N: ArrayLength<u8>,
{
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.cancelled {
Poll::Ready(Ok(self.buffer.len() - self.remaining))
} else {
loop {
let remaining = self.remaining;
match self.producer.grant_max_remaining(remaining) {
Ok(mut grant) => {
let buf = grant.buf();

let wp = self.buffer.len() - self.remaining;
let to_copy = core::cmp::min(self.remaining, buf.len());

buf[..to_copy].copy_from_slice(&self.buffer[wp..wp + to_copy]);

self.remaining -= to_copy;
grant.commit(to_copy);

self.producer.notify_consumer();

if self.remaining == 0 {
return Poll::Ready(Ok(self.buffer.len()));
} else {
match self.producer.poll(cx) {
Poll::Pending => {
return Poll::Pending;
}
_ => {}
}
}
}
Err(Error::InsufficientSize) => match self.producer.poll(cx) {
Poll::Pending => {
return Poll::Pending;
}
_ => {}
},
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}
}

/// `Consumer` is the primary interface for reading data from a `BBBuffer`.
Expand All @@ -542,6 +653,83 @@ where

unsafe impl<'a, N> Send for Consumer<'a, N> where N: ArrayLength<u8> {}

/// TODO: Documentation for struct
pub struct AsyncRead<'a, N>
where
N: ArrayLength<u8>,
{
consumer: &'a mut Consumer<'a, N>,
buffer: &'a mut [u8],
remaining: usize,
cancelled: bool,
}

impl<'a, N> AsyncRead<'a, N>
where
N: ArrayLength<u8>,
{
fn new(consumer: &'a mut Consumer<'a, N>, buffer: &'a mut [u8]) -> Self {
Self {
consumer,
cancelled: false,
remaining: buffer.len(),
buffer,
}
}

/// Signal that this future is cancelled and should no longer poll data
pub fn cancel(&mut self) {
self.cancelled = true;
}
}

impl<'a, N> Future for AsyncRead<'a, N>
where
N: ArrayLength<u8>,
{
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.cancelled {
Poll::Ready(Ok(self.buffer.len() - self.remaining))
} else {
loop {
match self.consumer.read() {
Ok(grant) => {
let buf = grant.buf();
let rp = self.buffer.len() - self.remaining;
let to_copy = core::cmp::min(self.remaining, buf.len());

self.buffer[rp..rp + to_copy].copy_from_slice(&buf[..to_copy]);
self.remaining -= to_copy;
grant.release(to_copy);

self.consumer.notify_producer();

if self.remaining == 0 {
return Poll::Ready(Ok(rp + to_copy));
} else {
match self.consumer.poll(cx) {
Poll::Pending => {
return Poll::Pending;
}
_ => {}
}
}
}
// If there was no data available, but we got signaled in the meantime, try again
Err(Error::InsufficientSize) => match self.consumer.poll(cx) {
Poll::Pending => {
return Poll::Pending;
}
_ => {}
},
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}
}

impl<'a, N> Consumer<'a, N>
where
N: ArrayLength<u8>,
Expand Down Expand Up @@ -680,6 +868,22 @@ where
to_release: 0,
})
}

/// Asynchronously read data into a provided buffer until the buffer is filled. The buffer must outlive the future
/// that is returned.
pub fn read_async(&'a mut self, rx_buffer: &'a mut [u8]) -> AsyncRead<'a, N> {
AsyncRead::new(self, rx_buffer)
}

fn poll(&self, cx: &mut Context<'_>) -> Poll<()> {
let inner = unsafe { &self.bbq.as_ref().0 };
inner.consumer_waker.poll_wait(cx)
}

fn notify_producer(&self) {
let inner = unsafe { &self.bbq.as_ref().0 };
inner.producer_waker.signal(())
}
}

impl<N> BBBuffer<N>
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
#![deny(warnings)]

mod bbbuffer;
mod signal;
pub use bbbuffer::*;

/// There are no longer separate `atomic` and `cm_mutex` modules. You can just use the types at the
Expand Down
Loading