1- //go:build !scheduler.threads
2-
31package runtime
42
53// This file implements the 'chan' type and send/receive/select operations.
@@ -32,11 +30,12 @@ package runtime
3230// non-select operations) so that the select operation knows which case did
3331// proceed.
3432// The value is at the same time also a way that goroutines can be the first
35- // (and only) goroutine to 'take' a channel operation to change it from
36- // 'waiting' to any other value. This is important for the select statement
37- // because multiple goroutines could try to let different channels in the
38- // select statement proceed at the same time. By using Task.Data, only a
39- // single channel operation in the select statement can proceed.
33+ // (and only) goroutine to 'take' a channel operation using an atomic CAS
34+ // operation to change it from 'waiting' to any other value. This is important
35+ // for the select statement because multiple goroutines could try to let
36+ // different channels in the select statement proceed at the same time. By
37+ // using Task.Data, only a single channel operation in the select statement
38+ // can proceed.
4039// - It is possible for the channel queues to contain already-processed senders
4140// or receivers. This can happen when the select statement managed to proceed
4241// but the goroutine doing the select has not yet cleaned up the stale queue
@@ -51,15 +50,17 @@ import (
5150
5251// The runtime implementation of the Go 'chan' type.
5352type channel struct {
54- closed bool
55- elementSize uintptr
56- bufCap uintptr // 'cap'
57- bufLen uintptr // 'len'
58- bufHead uintptr
59- bufTail uintptr
60- senders chanQueue
61- receivers chanQueue
62- buf unsafe.Pointer
53+ closed bool
54+ selectLocked bool
55+ elementSize uintptr
56+ bufCap uintptr // 'cap'
57+ bufLen uintptr // 'len'
58+ bufHead uintptr
59+ bufTail uintptr
60+ senders chanQueue
61+ receivers chanQueue
62+ lock task.PMutex
63+ buf unsafe.Pointer
6364}
6465
6566const (
@@ -75,7 +76,8 @@ type chanQueue struct {
7576
7677// Pus the next channel operation to the queue. All appropriate fields must have
7778// been initialized already.
78- // This function must be called with interrupts disabled.
79+ // This function must be called with interrupts disabled and the channel lock
80+ // held.
7981func (q * chanQueue ) push (node * channelOp ) {
8082 node .next = q .first
8183 q .first = node
@@ -101,16 +103,17 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
101103 newDataValue := chanOp | popped .index << 2
102104
103105 // Try to be the first to proceed with this goroutine.
104- if popped .task .DataUint32 () == chanOperationWaiting {
105- popped . task . SetDataUint32 ( newDataValue )
106+ swapped := popped .task .DataAtomicUint32 (). CompareAndSwap ( 0 , newDataValue )
107+ if swapped {
106108 return popped
107109 }
108110 }
109111}
110112
111113// Remove the given to-be-removed node from the queue if it is part of the
112114// queue. If there are multiple, only one will be removed.
113- // This function must be called with interrupts disabled.
115+ // This function must be called with interrupts disabled and the channel lock
116+ // held.
114117func (q * chanQueue ) remove (remove * channelOp ) {
115118 n := & q .first
116119 for * n != nil {
@@ -161,8 +164,8 @@ func chanCap(c *channel) int {
161164}
162165
163166// Push the value to the channel buffer array, for a send operation.
164- // This function may only be called when interrupts are disabled and it is known
165- // there is space available in the buffer.
167+ // This function may only be called when interrupts are disabled, the channel is
168+ // locked and it is known there is space available in the buffer.
166169func (ch * channel ) bufferPush (value unsafe.Pointer ) {
167170 elemAddr := unsafe .Add (ch .buf , ch .bufHead * ch .elementSize )
168171 ch .bufLen ++
@@ -176,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {
176179
177180// Pop a value from the channel buffer and store it in the 'value' pointer, for
178181// a receive operation.
179- // This function may only be called when interrupts are disabled and it is known
180- // there is at least one value available in the buffer.
182+ // This function may only be called when interrupts are disabled, the channel is
183+ // locked and it is known there is at least one value available in the buffer.
181184func (ch * channel ) bufferPop (value unsafe.Pointer ) {
182185 elemAddr := unsafe .Add (ch .buf , ch .bufTail * ch .elementSize )
183186 ch .bufLen --
@@ -193,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
193196}
194197
195198// Try to proceed with this send operation without blocking, and return whether
196- // the send succeeded. Interrupts must be disabled when calling this function.
199+ // the send succeeded. Interrupts must be disabled and the lock must be held
200+ // when calling this function.
197201func (ch * channel ) trySend (value unsafe.Pointer ) bool {
198202 // To make sure we send values in the correct order, we can only send
199203 // directly to a receiver when there are no values in the buffer.
@@ -232,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
232236 }
233237
234238 mask := interrupt .Disable ()
239+ ch .lock .Lock ()
235240
236241 // See whether we can proceed immediately, and if so, return early.
237242 if ch .trySend (value ) {
243+ ch .lock .Unlock ()
238244 interrupt .Restore (mask )
239245 return
240246 }
@@ -246,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
246252 op .index = 0
247253 op .value = value
248254 ch .senders .push (op )
255+ ch .lock .Unlock ()
249256 interrupt .Restore (mask )
250257
251258 // Wait until this goroutine is resumed.
259+ // It might be resumed after Unlock() and before Pause(). In that case,
260+ // because we use semaphores, the Pause() will continue immediately.
252261 task .Pause ()
253262
254263 // Check whether the sent happened normally (not because the channel was
@@ -260,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
260269}
261270
262271// Try to proceed with this receive operation without blocking, and return
263- // whether the receive operation succeeded. Interrupts must be disabled when
264- // calling this function.
272+ // whether the receive operation succeeded. Interrupts must be disabled and the
273+ // lock must be held when calling this function.
265274func (ch * channel ) tryRecv (value unsafe.Pointer ) (received , ok bool ) {
266275 // To make sure we keep the values in the channel in the correct order, we
267276 // first have to read values from the buffer before we can look at the
@@ -305,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
305314 }
306315
307316 mask := interrupt .Disable ()
317+ ch .lock .Lock ()
308318
309319 if received , ok := ch .tryRecv (value ); received {
320+ ch .lock .Unlock ()
310321 interrupt .Restore (mask )
311322 return ok
312323 }
@@ -319,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
319330 op .task = t
320331 op .index = 0
321332 ch .receivers .push (op )
333+ ch .lock .Unlock ()
322334 interrupt .Restore (mask )
323335
324336 // Wait until the goroutine is resumed.
@@ -337,9 +349,11 @@ func chanClose(ch *channel) {
337349 }
338350
339351 mask := interrupt .Disable ()
352+ ch .lock .Lock ()
340353
341354 if ch .closed {
342355 // Not allowed by the language spec.
356+ ch .lock .Unlock ()
343357 interrupt .Restore (mask )
344358 runtimePanic ("close of closed channel" )
345359 }
@@ -372,14 +386,56 @@ func chanClose(ch *channel) {
372386
373387 ch .closed = true
374388
389+ ch .lock .Unlock ()
375390 interrupt .Restore (mask )
376391}
377392
393+ // We currently use a global select lock to avoid deadlocks while locking each
394+ // individual channel in the select. Without this global lock, two select
395+ // operations that have a different order of the same channels could end up in a
396+ // deadlock. This global lock is inefficient if there are many select operations
397+ // happening in parallel, but gets the job done.
398+ //
399+ // If this becomes a performance issue, we can see how the Go runtime does this.
400+ // I think it does this by sorting all states by channel address and then
401+ // locking them in that order to avoid this deadlock.
402+ var chanSelectLock task.PMutex
403+
404+ // Lock all channels (taking care to skip duplicate channels).
405+ func lockAllStates (states []chanSelectState ) {
406+ if ! hasParallelism {
407+ return
408+ }
409+ for _ , state := range states {
410+ if state .ch != nil && ! state .ch .selectLocked {
411+ state .ch .lock .Lock ()
412+ state .ch .selectLocked = true
413+ }
414+ }
415+ }
416+
417+ // Unlock all channels (taking care to skip duplicate channels).
418+ func unlockAllStates (states []chanSelectState ) {
419+ if ! hasParallelism {
420+ return
421+ }
422+ for _ , state := range states {
423+ if state .ch != nil && state .ch .selectLocked {
424+ state .ch .lock .Unlock ()
425+ state .ch .selectLocked = false
426+ }
427+ }
428+ }
429+
378430// chanSelect implements blocking or non-blocking select operations.
379431// The 'ops' slice must be set if (and only if) this is a blocking select.
380432func chanSelect (recvbuf unsafe.Pointer , states []chanSelectState , ops []channelOp ) (uint32 , bool ) {
381433 mask := interrupt .Disable ()
382434
435+ // Lock everything.
436+ chanSelectLock .Lock ()
437+ lockAllStates (states )
438+
383439 const selectNoIndex = ^ uint32 (0 )
384440 selectIndex := selectNoIndex
385441 selectOk := true
@@ -411,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
411467 // return early.
412468 blocking := len (ops ) != 0
413469 if selectIndex != selectNoIndex || ! blocking {
470+ unlockAllStates (states )
471+ chanSelectLock .Unlock ()
414472 interrupt .Restore (mask )
415473 return selectIndex , selectOk
416474 }
@@ -419,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
419477 // become more complicated.
420478 // We add ourselves as a sender/receiver to every channel, and wait for the
421479 // first one to complete. Only one will successfully complete, because
422- // senders and receivers will check t.Data for the state so that only one
423- // will be able to "take" this select operation.
480+ // senders and receivers use a compare-and-exchange atomic operation on
481+ // t.Data so that only one will be able to "take" this select operation.
424482 t := task .Current ()
425483 t .Ptr = recvbuf
426484 t .SetDataUint32 (chanOperationWaiting )
@@ -440,13 +498,17 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
440498 }
441499
442500 // Now we wait until one of the send/receive operations can proceed.
501+ unlockAllStates (states )
502+ chanSelectLock .Unlock ()
443503 interrupt .Restore (mask )
444504 task .Pause ()
445505
446506 // Resumed, so one channel operation must have progressed.
447507
448508 // Make sure all channel ops are removed from the senders/receivers
449509 // queue before we return and the memory of them becomes invalid.
510+ chanSelectLock .Lock ()
511+ lockAllStates (states )
450512 for i , state := range states {
451513 if state .ch == nil {
452514 continue
@@ -460,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
460522 }
461523 interrupt .Restore (mask )
462524 }
525+ unlockAllStates (states )
526+ chanSelectLock .Unlock ()
463527
464528 // Pull the return values out of t.Data (which contains two bitfields).
465529 selectIndex = t .DataUint32 () >> 2
0 commit comments