@@ -156,7 +156,7 @@ func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
156156	var  (
157157		c1 , c2   =  make (chan  Msg ), make (chan  Msg )
158158		closing  =  make (chan  struct {})
159- 		closed   =  new (atomic.Int32 )
159+ 		closed   =  new (atomic.Bool )
160160		rw1      =  & MsgPipeRW {c1 , c2 , closing , closed }
161161		rw2      =  & MsgPipeRW {c2 , c1 , closing , closed }
162162	)
@@ -172,13 +172,13 @@ type MsgPipeRW struct {
172172	w        chan <-  Msg 
173173	r        <- chan  Msg 
174174	closing  chan  struct {}
175- 	closed   * atomic.Int32 
175+ 	closed   * atomic.Bool 
176176}
177177
178178// WriteMsg sends a message on the pipe. 
179179// It blocks until the receiver has consumed the message payload. 
180180func  (p  * MsgPipeRW ) WriteMsg (msg  Msg ) error  {
181- 	if  p .closed .Load ()  ==   0  {
181+ 	if  ! p .closed .Load () {
182182		consumed  :=  make (chan  struct {}, 1 )
183183		msg .Payload  =  & eofSignal {msg .Payload , msg .Size , consumed }
184184		select  {
@@ -199,7 +199,7 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {
199199
200200// ReadMsg returns a message sent on the other end of the pipe. 
201201func  (p  * MsgPipeRW ) ReadMsg () (Msg , error ) {
202- 	if  p .closed .Load ()  ==   0  {
202+ 	if  ! p .closed .Load () {
203203		select  {
204204		case  msg  :=  <- p .r :
205205			return  msg , nil 
@@ -213,9 +213,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) {
213213// of the pipe. They will return ErrPipeClosed. Close also 
214214// interrupts any reads from a message payload. 
215215func  (p  * MsgPipeRW ) Close () error  {
216- 	if  p .closed .Add ( 1 )  !=   1  {
216+ 	if  p .closed .Swap ( true )  {
217217		// someone else is already closing 
218- 		p .closed .Store (1 ) // avoid overflow 
219218		return  nil 
220219	}
221220	close (p .closing )
0 commit comments