Handle synchronous and asynchronous tasks enqueued in the background using individual workers or worker pools. The worker package provides a simple configurable task queue that processes tasks in the background. Workers process tasks in FIFO order with configurable rate limiting.
- Single Worker: Process tasks sequentially with rate limiting
- Worker Pool: Distribute tasks across multiple workers for parallel processing
- Unified Interface: Both Worker and WorkerPool implement the
WorkProcessor
interface - Graceful Shutdown: All pending tasks are processed before stopping
- Error Handling: Custom error handlers for task failures
- Flexible Configuration: Rate limiting, timeouts, and burst capacity
// Create a worker with custom configuration
w, err := worker.New(worker.Config{
Rate: 10, // 10 tasks per second
Burst: 5, // Buffer up to 5 tasks
Timeout: time.Second, // 1 second timeout for operations
})
if err != nil {
log.Fatal(err)
}
defer worker.Stop(w)
All operations are performed through command functions:
err := worker.Enqueue(w, func() error {
// Do work
return nil
})
err := worker.EnqueueWaiting(w, func() error {
// Do work synchronously
return nil
})
awaiter, err := worker.AsyncAwait(w, func() error {
// Do work
return nil
}, 5*time.Second)
// Do other work...
err = awaiter() // Wait for completion
err := worker.Stop(w)
The worker will process all pending tasks before stopping. Error handling can be customized through the ErrorHandler in the configuration.
Create and use a pool of workers for parallel task processing:
// Create a pool with 5 workers
pool, err := worker.NewWorkerPool(5, worker.DefaultConfig())
if err != nil {
log.Fatal(err)
}
defer worker.Stop(pool)
// Enqueue tasks - they'll be distributed across workers
worker.Enqueue(pool, func() error {
fmt.Println("Task executed by one of the pool workers")
return nil
})
Both Worker and WorkerPool implement the WorkProcessor
interface internally. The package-level functions accept this interface:
// All these functions work with both Worker and WorkerPool:
worker.Enqueue(processor, task)
worker.EnqueueWaiting(processor, task)
worker.EnqueueAwaiting(processor, task, timeout)
worker.Stop(processor)
This allows you to write code that works with both:
func processTasksWithAnyProcessor(wp worker.WorkProcessor) {
// Works with both Worker and WorkerPool!
worker.Enqueue(wp, func() error {
fmt.Println("Processing task...")
return nil
})
}
// Use with single worker
w, _ := worker.New(worker.DefaultConfig())
processTasksWithAnyProcessor(w)
// Use with pool
pool, _ := worker.NewWorkerPool(5, worker.DefaultConfig())
processTasksWithAnyProcessor(pool)
The Config
struct allows you to customize worker behavior:
type Config struct {
Rate int // Tasks per second (0 = unlimited)
Burst int // Maximum burst size for rate limiter
Timeout time.Duration // Timeout for operations
ErrorHandler func(error) // Custom error handler (optional)
}
Use worker.DefaultConfig()
for sensible defaults.
By default, errors are logged. You can provide a custom error handler:
cfg := worker.DefaultConfig()
cfg.ErrorHandler = func(err error) {
// Custom error handling logic
log.Printf("Task error: %v", err)
}
w, _ := worker.New(cfg)
-
Use a single Worker when:
- Tasks must be processed in order
- Rate limiting is more important than throughput
- Resource usage needs to be minimal
-
Use a WorkerPool when:
- Tasks can be processed in parallel
- Higher throughput is needed
- You have CPU-bound or I/O-bound tasks that benefit from concurrency
package main
import (
"fmt"
"log"
"time"
"tideland.dev/go/worker"
)
func main() {
// Create worker
w, err := worker.New(worker.DefaultConfig())
if err != nil {
log.Fatal(err)
}
defer worker.Stop(w)
// Enqueue some tasks
for i := 0; i < 10; i++ {
taskNum := i
worker.Enqueue(w, func() error {
fmt.Printf("Processing task %d\n", taskNum)
time.Sleep(100 * time.Millisecond)
return nil
})
}
// Wait a bit for tasks to complete
time.Sleep(2 * time.Second)
}
package main
import (
"fmt"
"log"
"sync/atomic"
"time"
"tideland.dev/go/worker"
)
func main() {
// Create a pool with 3 workers
pool, err := worker.NewWorkerPool(3, worker.DefaultConfig())
if err != nil {
log.Fatal(err)
}
defer worker.Stop(pool)
var completed int32
// Enqueue 20 tasks
for i := 0; i < 20; i++ {
taskNum := i
worker.Enqueue(pool, func() error {
fmt.Printf("Worker processing task %d\n", taskNum)
time.Sleep(500 * time.Millisecond)
atomic.AddInt32(&completed, 1)
return nil
})
}
// Wait for all tasks to complete
for atomic.LoadInt32(&completed) < 20 {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("All tasks completed!")
}
- Frank Mueller (https://github.com/themue / https://github.com/tideland / https://themue.dev)