Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ mod queue;
#[cfg(feature = "sink")]
mod sink_impl;

#[derive(Debug)]
struct UnboundedSenderInner<T> {
// Channel state shared between the sender and receiver.
inner: Arc<UnboundedInner<T>>,
}

#[derive(Debug)]
struct BoundedSenderInner<T> {
// Channel state shared between the sender and receiver.
inner: Arc<BoundedInner<T>>,
Expand All @@ -122,13 +120,11 @@ impl<T> Unpin for BoundedSenderInner<T> {}
/// The transmission end of a bounded mpsc channel.
///
/// This value is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Sender<T>(Option<BoundedSenderInner<T>>);

/// The transmission end of an unbounded mpsc channel.
///
/// This value is created by the [`unbounded`](unbounded) function.
#[derive(Debug)]
pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);

trait AssertKinds: Send + Sync + Clone {}
Expand All @@ -137,15 +133,13 @@ impl AssertKinds for UnboundedSender<u32> {}
/// The receiving end of a bounded mpsc channel.
///
/// This value is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Option<Arc<BoundedInner<T>>>,
}

/// The receiving end of an unbounded mpsc channel.
///
/// This value is created by the [`unbounded`](unbounded) function.
#[derive(Debug)]
pub struct UnboundedReceiver<T> {
inner: Option<Arc<UnboundedInner<T>>>,
}
Expand Down Expand Up @@ -255,7 +249,6 @@ impl fmt::Display for TryRecvError {

impl std::error::Error for TryRecvError {}

#[derive(Debug)]
struct UnboundedInner<T> {
// Internal channel state. Consists of the number of messages stored in the
// channel as well as a flag signalling that the channel is closed.
Expand All @@ -271,7 +264,6 @@ struct UnboundedInner<T> {
recv_task: AtomicWaker,
}

#[derive(Debug)]
struct BoundedInner<T> {
// Max buffer size of the channel.
buffer: usize,
Expand All @@ -294,7 +286,7 @@ struct BoundedInner<T> {
}

// Struct representation of `Inner::state`.
#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
struct State {
// `true` when the channel is open
is_open: bool,
Expand All @@ -318,7 +310,6 @@ const MAX_CAPACITY: usize = !(OPEN_MASK);
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;

// Sent to the consumer to wake up blocked producers
#[derive(Debug)]
struct SenderTask {
task: Option<Waker>,
is_parked: bool,
Expand Down Expand Up @@ -941,6 +932,18 @@ impl<T> Drop for BoundedSenderInner<T> {
}
}

impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
}
}

impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
}
}

/*
*
* ===== impl Receiver =====
Expand Down Expand Up @@ -1101,6 +1104,18 @@ impl<T> Drop for Receiver<T> {
}
}

impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let closed = if let Some(ref inner) = self.inner {
decode_state(inner.state.load(SeqCst)).is_closed()
} else {
false
};

f.debug_struct("Receiver").field("closed", &closed).finish()
}
}

impl<T> UnboundedReceiver<T> {
/// Closes the receiving half of a channel, without dropping it.
///
Expand Down Expand Up @@ -1233,6 +1248,18 @@ impl<T> Drop for UnboundedReceiver<T> {
}
}

impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let closed = if let Some(ref inner) = self.inner {
decode_state(inner.state.load(SeqCst)).is_closed()
} else {
false
};

f.debug_struct("Receiver").field("closed", &closed).finish()
}
}

/*
*
* ===== impl Inner =====
Expand Down
2 changes: 0 additions & 2 deletions futures-channel/src/mpsc/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ pub(super) enum PopResult<T> {
Inconsistent,
}

#[derive(Debug)]
struct Node<T> {
next: AtomicPtr<Self>,
value: Option<T>,
Expand All @@ -70,7 +69,6 @@ struct Node<T> {
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
#[derive(Debug)]
pub(super) struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: UnsafeCell<*mut Node<T>>,
Expand Down