@@ -30,11 +30,12 @@ package runtime
3030// non-select operations) so that the select operation knows which case did
3131// proceed.
3232// The value is at the same time also a way that goroutines can be the first
33- // (and only) goroutine to 'take' a channel operation to change it from
34- // 'waiting' to any other value. This is important for the select statement
35- // because multiple goroutines could try to let different channels in the
36- // select statement proceed at the same time. By using Task.Data, only a
37- // single channel operation in the select statement can proceed.
33+ // (and only) goroutine to 'take' a channel operation using an atomic CAS
34+ // operation to change it from 'waiting' to any other value. This is important
35+ // for the select statement because multiple goroutines could try to let
36+ // different channels in the select statement proceed at the same time. By
37+ // using Task.Data, only a single channel operation in the select statement
38+ // can proceed.
3839// - It is possible for the channel queues to contain already-processed senders
3940// or receivers. This can happen when the select statement managed to proceed
4041// but the goroutine doing the select has not yet cleaned up the stale queue
@@ -49,15 +50,17 @@ import (
4950
5051// The runtime implementation of the Go 'chan' type.
5152type channel struct {
52- closed bool
53- elementSize uintptr
54- bufCap uintptr // 'cap'
55- bufLen uintptr // 'len'
56- bufHead uintptr
57- bufTail uintptr
58- senders chanQueue
59- receivers chanQueue
60- buf unsafe.Pointer
53+ closed bool
54+ selectLocked bool
55+ elementSize uintptr
56+ bufCap uintptr // 'cap'
57+ bufLen uintptr // 'len'
58+ bufHead uintptr
59+ bufTail uintptr
60+ senders chanQueue
61+ receivers chanQueue
62+ lock task.PMutex
63+ buf unsafe.Pointer
6164}
6265
6366const (
@@ -73,7 +76,8 @@ type chanQueue struct {
7376
7477// Pus the next channel operation to the queue. All appropriate fields must have
7578// been initialized already.
76- // This function must be called with interrupts disabled.
79+ // This function must be called with interrupts disabled and the channel lock
80+ // held.
7781func (q * chanQueue ) push (node * channelOp ) {
7882 node .next = q .first
7983 q .first = node
@@ -99,16 +103,17 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
99103 newDataValue := chanOp | popped .index << 2
100104
101105 // Try to be the first to proceed with this goroutine.
102- if popped .task .DataUint32 () == chanOperationWaiting {
103- popped . task . SetDataUint32 ( newDataValue )
106+ swapped := popped .task .DataAtomicUint32 (). CompareAndSwap ( 0 , newDataValue )
107+ if swapped {
104108 return popped
105109 }
106110 }
107111}
108112
109113// Remove the given to-be-removed node from the queue if it is part of the
110114// queue. If there are multiple, only one will be removed.
111- // This function must be called with interrupts disabled.
115+ // This function must be called with interrupts disabled and the channel lock
116+ // held.
112117func (q * chanQueue ) remove (remove * channelOp ) {
113118 n := & q .first
114119 for * n != nil {
@@ -159,8 +164,8 @@ func chanCap(c *channel) int {
159164}
160165
161166// Push the value to the channel buffer array, for a send operation.
162- // This function may only be called when interrupts are disabled and it is known
163- // there is space available in the buffer.
167+ // This function may only be called when interrupts are disabled, the channel is
168+ // locked and it is known there is space available in the buffer.
164169func (ch * channel ) bufferPush (value unsafe.Pointer ) {
165170 elemAddr := unsafe .Add (ch .buf , ch .bufHead * ch .elementSize )
166171 ch .bufLen ++
@@ -174,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {
174179
175180// Pop a value from the channel buffer and store it in the 'value' pointer, for
176181// a receive operation.
177- // This function may only be called when interrupts are disabled and it is known
178- // there is at least one value available in the buffer.
182+ // This function may only be called when interrupts are disabled, the channel is
183+ // locked and it is known there is at least one value available in the buffer.
179184func (ch * channel ) bufferPop (value unsafe.Pointer ) {
180185 elemAddr := unsafe .Add (ch .buf , ch .bufTail * ch .elementSize )
181186 ch .bufLen --
@@ -191,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
191196}
192197
193198// Try to proceed with this send operation without blocking, and return whether
194- // the send succeeded. Interrupts must be disabled when calling this function.
199+ // the send succeeded. Interrupts must be disabled and the lock must be held
200+ // when calling this function.
195201func (ch * channel ) trySend (value unsafe.Pointer ) bool {
196202 // To make sure we send values in the correct order, we can only send
197203 // directly to a receiver when there are no values in the buffer.
@@ -230,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
230236 }
231237
232238 mask := interrupt .Disable ()
239+ ch .lock .Lock ()
233240
234241 // See whether we can proceed immediately, and if so, return early.
235242 if ch .trySend (value ) {
243+ ch .lock .Unlock ()
236244 interrupt .Restore (mask )
237245 return
238246 }
@@ -244,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
244252 op .index = 0
245253 op .value = value
246254 ch .senders .push (op )
255+ ch .lock .Unlock ()
247256 interrupt .Restore (mask )
248257
249258 // Wait until this goroutine is resumed.
259+ // It might be resumed after Unlock() and before Pause(). In that case,
260+ // because we use semaphores, the Pause() will continue immediately.
250261 task .Pause ()
251262
252263 // Check whether the sent happened normally (not because the channel was
@@ -258,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
258269}
259270
260271// Try to proceed with this receive operation without blocking, and return
261- // whether the receive operation succeeded. Interrupts must be disabled when
262- // calling this function.
272+ // whether the receive operation succeeded. Interrupts must be disabled and the
273+ // lock must be held when calling this function.
263274func (ch * channel ) tryRecv (value unsafe.Pointer ) (received , ok bool ) {
264275 // To make sure we keep the values in the channel in the correct order, we
265276 // first have to read values from the buffer before we can look at the
@@ -303,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
303314 }
304315
305316 mask := interrupt .Disable ()
317+ ch .lock .Lock ()
306318
307319 if received , ok := ch .tryRecv (value ); received {
320+ ch .lock .Unlock ()
308321 interrupt .Restore (mask )
309322 return ok
310323 }
@@ -317,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
317330 op .task = t
318331 op .index = 0
319332 ch .receivers .push (op )
333+ ch .lock .Unlock ()
320334 interrupt .Restore (mask )
321335
322336 // Wait until the goroutine is resumed.
@@ -335,9 +349,11 @@ func chanClose(ch *channel) {
335349 }
336350
337351 mask := interrupt .Disable ()
352+ ch .lock .Lock ()
338353
339354 if ch .closed {
340355 // Not allowed by the language spec.
356+ ch .lock .Unlock ()
341357 interrupt .Restore (mask )
342358 runtimePanic ("close of closed channel" )
343359 }
@@ -370,14 +386,56 @@ func chanClose(ch *channel) {
370386
371387 ch .closed = true
372388
389+ ch .lock .Unlock ()
373390 interrupt .Restore (mask )
374391}
375392
393+ // We currently use a global select lock to avoid deadlocks while locking each
394+ // individual channel in the select. Without this global lock, two select
395+ // operations that have a different order of the same channels could end up in a
396+ // deadlock. This global lock is inefficient if there are many select operations
397+ // happening in parallel, but gets the job done.
398+ //
399+ // If this becomes a performance issue, we can see how the Go runtime does this.
400+ // I think it does this by sorting all states by channel address and then
401+ // locking them in that order to avoid this deadlock.
402+ var chanSelectLock task.PMutex
403+
404+ // Lock all channels (taking care to skip duplicate channels).
405+ func lockAllStates (states []chanSelectState ) {
406+ if ! hasParallelism {
407+ return
408+ }
409+ for _ , state := range states {
410+ if state .ch != nil && ! state .ch .selectLocked {
411+ state .ch .lock .Lock ()
412+ state .ch .selectLocked = true
413+ }
414+ }
415+ }
416+
417+ // Unlock all channels (taking care to skip duplicate channels).
418+ func unlockAllStates (states []chanSelectState ) {
419+ if ! hasParallelism {
420+ return
421+ }
422+ for _ , state := range states {
423+ if state .ch != nil && state .ch .selectLocked {
424+ state .ch .lock .Unlock ()
425+ state .ch .selectLocked = false
426+ }
427+ }
428+ }
429+
376430// chanSelect implements blocking or non-blocking select operations.
377431// The 'ops' slice must be set if (and only if) this is a blocking select.
378432func chanSelect (recvbuf unsafe.Pointer , states []chanSelectState , ops []channelOp ) (uint32 , bool ) {
379433 mask := interrupt .Disable ()
380434
435+ // Lock everything.
436+ chanSelectLock .Lock ()
437+ lockAllStates (states )
438+
381439 const selectNoIndex = ^ uint32 (0 )
382440 selectIndex := selectNoIndex
383441 selectOk := true
@@ -409,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
409467 // return early.
410468 blocking := len (ops ) != 0
411469 if selectIndex != selectNoIndex || ! blocking {
470+ unlockAllStates (states )
471+ chanSelectLock .Unlock ()
412472 interrupt .Restore (mask )
413473 return selectIndex , selectOk
414474 }
@@ -417,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
417477 // become more complicated.
418478 // We add ourselves as a sender/receiver to every channel, and wait for the
419479 // first one to complete. Only one will successfully complete, because
420- // senders and receivers will check t.Data for the state so that only one
421- // will be able to "take" this select operation.
480+ // senders and receivers use a compare-and-exchange atomic operation on
481+ // t.Data so that only one will be able to "take" this select operation.
422482 t := task .Current ()
423483 t .Ptr = recvbuf
424484 t .SetDataUint32 (chanOperationWaiting )
@@ -438,13 +498,17 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
438498 }
439499
440500 // Now we wait until one of the send/receive operations can proceed.
501+ unlockAllStates (states )
502+ chanSelectLock .Unlock ()
441503 interrupt .Restore (mask )
442504 task .Pause ()
443505
444506 // Resumed, so one channel operation must have progressed.
445507
446508 // Make sure all channel ops are removed from the senders/receivers
447509 // queue before we return and the memory of them becomes invalid.
510+ chanSelectLock .Lock ()
511+ lockAllStates (states )
448512 for i , state := range states {
449513 if state .ch == nil {
450514 continue
@@ -458,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
458522 }
459523 interrupt .Restore (mask )
460524 }
525+ unlockAllStates (states )
526+ chanSelectLock .Unlock ()
461527
462528 // Pull the return values out of t.Data (which contains two bitfields).
463529 selectIndex = t .DataUint32 () >> 2
0 commit comments