diff --git a/src/examples/core-pinning/main.go b/src/examples/core-pinning/main.go new file mode 100644 index 0000000000..94e7ae1b06 --- /dev/null +++ b/src/examples/core-pinning/main.go @@ -0,0 +1,111 @@ +// This example demonstrates goroutine core pinning on multi-core systems (RP2040/RP2350). +// It shows how to pin goroutines to specific CPU cores and verify their execution. + +//go:build rp2040 || rp2350 + +package main + +import ( + "machine" + "runtime" + "time" +) + +func main() { + time.Sleep(5 * time.Second) + println("=== Core Pinning Example ===") + println("Number of CPU cores:", runtime.NumCPU()) + println("[main] Main starting on core:", machine.CurrentCore()) + println() + + // Example 1: Pin using standard Go API (LockOSThread) + // This pins to whichever core this goroutine is currently running on + runtime.LockOSThread() + println("[main] Pinned using runtime.LockOSThread()") + println("[main] Running on core:", machine.CurrentCore()) + runtime.UnlockOSThread() + println("[main] Unpinned using runtime.UnlockOSThread()") + println() + + // Example 2: Pin to a specific core using machine package + machine.LockCore(0) + println("[main] Explicitly pinned to core 0 using machine.LockCore()") + println() + + // Start a goroutine pinned to core 1 + go core1Worker() + + // Start a goroutine using standard LockOSThread + go standardLockWorker() + + // Start an unpinned goroutine (can run on either core) + go unpinnedWorker() + + // Main loop on core 0 + for i := 0; i < 10; i++ { + println("[main] loop", i, "on CPU", machine.CurrentCore()) + time.Sleep(500 * time.Millisecond) + } + + // Unpin and let main run on any core + machine.UnlockCore() + println() + println("[main] Unpinned using machine.UnlockCore()") + + // Continue running for a bit to show potential migration + for i := 0; i < 5; i++ { + println("[main] unpinned loop on CPU", machine.CurrentCore()) + time.Sleep(500 * time.Millisecond) + } + + println() + println("Example complete!") +} + +// Worker function that pins to core 1 using explicit core selection +func core1Worker() { + // Pin this goroutine to core 1 explicitly + machine.LockCore(1) + println("[core1-worker] Worker pinned to core 1 using machine.LockCore()") + + for i := 0; i < 10; i++ { + println("[core1-worker] loop", i, "on CPU", machine.CurrentCore()) + time.Sleep(500 * time.Millisecond) + } + + println("[core1-worker] Finished") +} + +// Worker function that uses standard Go LockOSThread() +func standardLockWorker() { + // Pin this goroutine to whichever core it starts on + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + core := machine.CurrentCore() + println("[std-lock-worker] Worker locked using runtime.LockOSThread()") + println("[std-lock-worker] Running on core:", core) + + for i := 0; i < 10; i++ { + println("[std-lock-worker] loop", i, "on CPU", machine.CurrentCore()) + time.Sleep(600 * time.Millisecond) + } + + println("[std-lock-worker] Finished") +} + +// Worker function that is not pinned (can run on any core) +func unpinnedWorker() { + println("[unpinned-worker] Starting") + + for i := 0; i < 10; i++ { + cpu := machine.CurrentCore() + println("[unpinned-worker] loop", i, "on CPU", cpu) + time.Sleep(700 * time.Millisecond) + + // Yield to potentially migrate to another core + runtime.Gosched() + } + + println("[unpinned-worker] Finished") +} diff --git a/src/internal/task/task.go b/src/internal/task/task.go index e257e1bc8e..40b0c3f3ae 100644 --- a/src/internal/task/task.go +++ b/src/internal/task/task.go @@ -29,6 +29,14 @@ type Task struct { // since it falls into the padding of the FipsIndicator bit above. RunState uint8 + // Affinity specifies which CPU core this task should run on. + // -1 means no affinity (can run on any core) + // 0, 1, etc. means pinned to that specific core + // To be used ONLY with the "cores" scheduler. + // By default, all goroutines are unpinned (Affinity = -1) + // Pinning takes effect at the next scheduling point (e.g., after time.Sleep(), channel operations, or runtime.Gosched()) + Affinity int8 + // DeferFrame stores a pointer to the (stack allocated) defer frame of the // goroutine that is used for the recover builtin. DeferFrame unsafe.Pointer diff --git a/src/machine/machine_rp2_cores.go b/src/machine/machine_rp2_cores.go new file mode 100644 index 0000000000..87e0d1277e --- /dev/null +++ b/src/machine/machine_rp2_cores.go @@ -0,0 +1,48 @@ +//go:build (rp2040 || rp2350) && scheduler.cores + +package machine + +const numCPU = 2 // RP2040 and RP2350 both have 2 cores + +// LockCore sets the affinity for the current goroutine to the specified core. +// This does not immediately migrate the goroutine; migration occurs at the next +// scheduling point. See machine_rp2.go for full documentation. +// +// To avoid potential blocking on a busy core, consider calling LockCore in an +// init function before any other goroutines have started. This guarantees the +// target core is available. +// +// This is useful for: +// - Isolating time-critical operations to a dedicated core +// - Improving cache locality for performance-sensitive code +// - Exclusive access to core-local resources +// +// Warning: Pinning goroutines can lead to load imbalance. The goroutine will +// wait in the specified core's queue even if other cores are idle. If a +// long-running goroutine occupies the target core, LockCore may appear to +// block indefinitely (until the next scheduling point on the target core). +func LockCore(core int) { + if core < 0 || core >= numCPU { + panic("machine: core out of range") + } + machineLockCore(core) +} + +// UnlockCore unpins the calling goroutine, allowing it to run on any available core. +// This undoes a previous call to LockCore. +// +// After calling UnlockCore, the scheduler is free to schedule the goroutine on +// any core for automatic load balancing. +// +// Only available on RP2040 and RP2350 with the "cores" scheduler. +func UnlockCore() { + machineUnlockCore() +} + +// Internal functions implemented in runtime/scheduler_cores.go +// +//go:linkname machineLockCore runtime.machineLockCore +func machineLockCore(core int) + +//go:linkname machineUnlockCore runtime.machineUnlockCore +func machineUnlockCore() diff --git a/src/machine/machine_rp2_nocores.go b/src/machine/machine_rp2_nocores.go new file mode 100644 index 0000000000..ac22544938 --- /dev/null +++ b/src/machine/machine_rp2_nocores.go @@ -0,0 +1,15 @@ +//go:build (rp2040 || rp2350) && !scheduler.cores + +package machine + +// LockCore is not available without the cores scheduler. +// This is a stub that panics. +func LockCore(core int) { + panic("machine.LockCore: not available without scheduler.cores") +} + +// UnlockCore is not available without the cores scheduler. +// This is a stub that panics. +func UnlockCore() { + panic("machine.UnlockCore: not available without scheduler.cores") +} diff --git a/src/runtime/runtime.go b/src/runtime/runtime.go index c9b0959384..eb97b1c1f5 100644 --- a/src/runtime/runtime.go +++ b/src/runtime/runtime.go @@ -98,14 +98,23 @@ func os_sigpipe() { } // LockOSThread wires the calling goroutine to its current operating system thread. -// Stub for now +// On microcontrollers with multiple cores (e.g., RP2040/RP2350), this pins the +// goroutine to the core it's currently running on. +// With the "cores" scheduler on RP2040/RP2350, this pins the goroutine to the +// core it's currently running on. The pinning takes effect at the next +// scheduling point (e.g., channel operation, time.Sleep, or Gosched). // Called by go1.18 standard library on windows, see https://github.com/golang/go/issues/49320 func LockOSThread() { + lockOSThreadImpl() } // UnlockOSThread undoes an earlier call to LockOSThread. -// Stub for now +// On microcontrollers with multiple cores, this unpins the goroutine, allowing +// it to run on any available core. +// With the "cores" scheduler, this unpins the goroutine, allowing it to run on +// any available core. func UnlockOSThread() { + unlockOSThreadImpl() } // KeepAlive makes sure the value in the interface is alive until at least the diff --git a/src/runtime/scheduler_cooperative.go b/src/runtime/scheduler_cooperative.go index 274daa84d5..56f7c3b8c7 100644 --- a/src/runtime/scheduler_cooperative.go +++ b/src/runtime/scheduler_cooperative.go @@ -261,6 +261,16 @@ func unlockAtomics(mask interrupt.State) { interrupt.Restore(mask) } +// lockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded). +func lockOSThreadImpl() { + // Single-threaded, nothing to do. +} + +// unlockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded). +func unlockOSThreadImpl() { + // Single-threaded, nothing to do. +} + func printlock() { // nothing to do } diff --git a/src/runtime/scheduler_cores.go b/src/runtime/scheduler_cores.go index c2736cafd2..ac811a8529 100644 --- a/src/runtime/scheduler_cores.go +++ b/src/runtime/scheduler_cores.go @@ -22,8 +22,9 @@ var secondaryCoresStarted bool var cpuTasks [numCPU]*task.Task var ( - sleepQueue *task.Task - runqueue task.Queue + sleepQueue *task.Task + runqueueShared task.Queue // For unpinned tasks (affinity = -1) + runqueueCore [numCPU]task.Queue // Per-core queues for pinned tasks ) func deadlock() { @@ -39,8 +40,14 @@ func scheduleTask(t *task.Task) { switch t.RunState { case task.RunStatePaused: // Paused, state is saved on the stack. - // Add it to the runqueue... - runqueue.Push(t) + // Route to appropriate queue based on affinity. + if t.Affinity >= 0 && t.Affinity < numCPU { + // Pinned to specific core + runqueueCore[t.Affinity].Push(t) + } else { + // Not pinned, use shared queue + runqueueShared.Push(t) + } // ...and wake up a sleeping core, if there is one. // (If all cores are already busy, this is a no-op). schedulerWake() @@ -86,7 +93,15 @@ func addSleepTask(t *task.Task, wakeup timeUnit) { func Gosched() { schedulerLock.Lock() - runqueue.Push(task.Current()) + t := task.Current() + + // Respect affinity when re-queueing. + if t.Affinity >= 0 && t.Affinity < numCPU { + runqueueCore[t.Affinity].Push(t) + } else { + runqueueShared.Push(t) + } + task.PauseLocked() } @@ -95,6 +110,50 @@ func NumCPU() int { return numCPU } +// +// Warning: Pinning goroutines can lead to load imbalance. The goroutine will +// wait in the specified core's queue even if other cores are idle. Use this +// feature carefully and only when you need explicit core affinity. +// +// Valid core values are 0 and 1. Panics if core is out of range. +// + +// machineLockCore pins the current goroutine to the specified CPU core. +// This is called by machine.LockCore() on RP2040/RP2350. +// It does not validate the core number - validation is done in machine package. +func machineLockCore(core int) { + schedulerLock.Lock() + t := task.Current() + if t != nil { + t.Affinity = int8(core) + } + schedulerLock.Unlock() + Gosched() +} + +// machineUnlockCore unpins the current goroutine. +// This is called by machine.UnlockCore() on RP2040/RP2350. +func machineUnlockCore() { + schedulerLock.Lock() + t := task.Current() + if t != nil { + t.Affinity = -1 + } + schedulerLock.Unlock() +} + +// lockOSThreadImpl implements LockOSThread for the cores scheduler. +// It pins the current goroutine to whichever core it's currently running on. +func lockOSThreadImpl() { + core := int(currentCPU()) + machineLockCore(core) +} + +// unlockOSThreadImpl implements UnlockOSThread for the cores scheduler. +func unlockOSThreadImpl() { + machineUnlockCore() +} + func addTimer(tn *timerNode) { schedulerLock.Lock() timerQueueAdd(tn) @@ -110,7 +169,7 @@ func removeTimer(t *timer) *timerNode { } func schedulerRunQueue() *task.Queue { - return &runqueue + return &runqueueShared } // Pause the current task for a given time. @@ -160,9 +219,33 @@ func run() { } func scheduler(_ bool) { + currentCore := int(currentCPU()) + for mainExited.Load() == 0 { // Check for ready-to-run tasks. - if runnable := runqueue.Pop(); runnable != nil { + // First, try to get a task pinned to this core. + var runnable *task.Task + if currentCore < numCPU { + runnable = runqueueCore[currentCore].Pop() + } + + // If no pinned tasks, try the shared queue. + if runnable == nil { + runnable = runqueueShared.Pop() + } + + if runnable != nil { + // Verify affinity constraint (sanity check). + if runnable.Affinity >= 0 && runnable.Affinity != int8(currentCore) { + // Shouldn't happen, but put it back on correct queue. + if runnable.Affinity < numCPU { + runqueueCore[runnable.Affinity].Push(runnable) + } else { + runqueueShared.Push(runnable) + } + continue + } + // Resume it now. setCurrentTask(runnable) runnable.RunState = task.RunStateRunning @@ -183,6 +266,19 @@ func scheduler(_ bool) { sleepQueue = sleepQueue.Next sleepingTask.Next = nil + // Check affinity before running. + if sleepingTask.Affinity >= 0 && sleepingTask.Affinity != int8(currentCore) { + // Task is pinned to a different core, re-queue it. + sleepingTask.RunState = task.RunStatePaused + if sleepingTask.Affinity < numCPU { + runqueueCore[sleepingTask.Affinity].Push(sleepingTask) + } else { + runqueueShared.Push(sleepingTask) + } + schedulerWake() + continue + } + // Run it now. setCurrentTask(sleepingTask) sleepingTask.RunState = task.RunStateRunning diff --git a/src/runtime/scheduler_none.go b/src/runtime/scheduler_none.go index 06722afcf8..c938b5c781 100644 --- a/src/runtime/scheduler_none.go +++ b/src/runtime/scheduler_none.go @@ -85,6 +85,16 @@ func unlockAtomics(mask interrupt.State) { interrupt.Restore(mask) } +// lockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded). +func lockOSThreadImpl() { + // Single-threaded, nothing to do. +} + +// unlockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded). +func unlockOSThreadImpl() { + // Single-threaded, nothing to do. +} + func printlock() { // nothing to do } diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go index 6d41d0c99e..52a3ab3782 100644 --- a/src/runtime/scheduler_threads.go +++ b/src/runtime/scheduler_threads.go @@ -157,3 +157,13 @@ func lockAtomics() interrupt.State { func unlockAtomics(mask interrupt.State) { atomicsLock.Unlock() } + +// lockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded). +func lockOSThreadImpl() { + // Single-threaded, nothing to do. +} + +// unlockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded). +func unlockOSThreadImpl() { + // Single-threaded, nothing to do. +}