Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
var (
c1, c2 = make(chan Msg), make(chan Msg)
closing = make(chan struct{})
closed = new(int32)
closed = new(atomic.Bool)
rw1 = &MsgPipeRW{c1, c2, closing, closed}
rw2 = &MsgPipeRW{c2, c1, closing, closed}
)
Expand All @@ -172,13 +172,13 @@ type MsgPipeRW struct {
w chan<- Msg
r <-chan Msg
closing chan struct{}
closed *int32
closed *atomic.Bool
}

// WriteMsg sends a message on the pipe.
// It blocks until the receiver has consumed the message payload.
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
if atomic.LoadInt32(p.closed) == 0 {
if !p.closed.Load() {
consumed := make(chan struct{}, 1)
msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
select {
Expand All @@ -199,7 +199,7 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
if atomic.LoadInt32(p.closed) == 0 {
if !p.closed.Load() {
select {
case msg := <-p.r:
return msg, nil
Expand All @@ -213,9 +213,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) {
// of the pipe. They will return ErrPipeClosed. Close also
// interrupts any reads from a message payload.
func (p *MsgPipeRW) Close() error {
if atomic.AddInt32(p.closed, 1) != 1 {
if p.closed.Swap(true) {
// someone else is already closing
atomic.StoreInt32(p.closed, 1) // avoid overflow
return nil
}
close(p.closing)
Expand Down