Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1ddfcb3
Remove `mode` option from configuration and rely on the config values
rosa Jun 25, 2024
386250a
Move mode checks from Poller to Runnable, where it should live
rosa Jun 26, 2024
b166bad
Move Signals module under Supervisor namespace, out of Processes
rosa Jun 26, 2024
b10d838
Change `supervised` to not be a mode
rosa Jun 26, 2024
2ed6abe
Start extracting forking behaviour from Supervisor
rosa Jun 26, 2024
bb44f36
Rename `forks` association to `supervisees`
rosa Jul 1, 2024
9a38f5f
Extract Timer for Supervisor's timed tasks
rosa Jul 3, 2024
faefbba
Fix race condition in test with recurring jobs getting enqueued
rosa Jul 22, 2024
601266e
Extract rest of forking behaviour from Supervisor to another subclass
rosa Jul 22, 2024
256be75
Rename `Supervisor::Forks` to `Supervisor::ForkSupervisor`
rosa Jul 22, 2024
c7caf01
Extract Pidfile setup and deletion to a different concern
rosa Jul 22, 2024
f6d980f
Simplify termination handling for forked supervisor
rosa Jul 23, 2024
32403b7
Emit claim events when claiming jobs
rosa Jul 23, 2024
d3a8a56
Fix and simplify logging of recurring tasks enqueuing
rosa Jul 23, 2024
11b0848
Rename `Single` concern to `Pidfiled`
rosa Jul 23, 2024
347451c
Call configured `on_thread_error` on thread errors when running async
rosa Jul 23, 2024
93eb0cf
Move Supervisor methods around to prepare for async supervisor
rosa Jul 24, 2024
ef21678
Remove unused options and simplify methods for fork supervisor tests
rosa Jul 24, 2024
9d81cc7
Guard against the case of maintenance tasks not having started before…
rosa Jul 24, 2024
18c12c5
Implement a first async supervisor and some tests for it
rosa Jul 24, 2024
eb026d3
Release orphaned executions just once, before booting the supervisor
rosa Jul 25, 2024
a02be31
Add tests to terminate and kill individual workers
rosa Jul 25, 2024
fcb7e76
Instrument starting and shutting down supervisor
rosa Jul 25, 2024
530a50b
Include process_id on process instrumentation events when possible
rosa Jul 25, 2024
e93e23d
Don't try to release claimed executions for dispatchers and supervisors
rosa Jul 25, 2024
16d32cb
Spare a query to claimed_executions when deregistering a dispatcher o…
rosa Jul 25, 2024
057977b
Use default worker/dispatcher only when the configuration is empty
rosa Jul 29, 2024
5907948
Port changes by @bensheldon to run Solid Queue in the same process as…
rosa Jul 29, 2024
8342d8e
Run tests for Puma plugin with Solid Queue as fork and async
rosa Jul 29, 2024
2a0f11b
Add instructions about upgrading to versions 0.4.x and 0.3.x
rosa Jul 29, 2024
c867ce8
Ignore `processes` option when running in async mode
rosa Jul 29, 2024
b119337
Update README with last changes about async mode
rosa Jul 29, 2024
cdd87ef
Fix procline for new Supervisor subclasses
rosa Jul 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ $ bundle exec rake solid_queue:start

This will start processing jobs in all queues using the default configuration. See [below](#configuration) to learn more about configuring Solid Queue.

For small projects, you can run Solid Queue on the same machine as your webserver. When you're ready to scale, Solid Queue supports horizontal scaling out-of-the-box. You can run Solid Queue on a separate server from your webserver, or even run `bundle exec rake solid_queue:start` on multiple machines at the same time. If you'd like to designate some machines to be only dispatchers or only workers, use `bundle exec rake solid_queue:dispatch` or `bundle exec rake solid_queue:work`, respectively.
For small projects, you can run Solid Queue on the same machine as your webserver. When you're ready to scale, Solid Queue supports horizontal scaling out-of-the-box. You can run Solid Queue on a separate server from your webserver, or even run `bundle exec rake solid_queue:start` on multiple machines at the same time. Depending on the configuration, you can designate some machines to run only dispatchers or only workers. See the [configuration](#configuration) section for more details on this.

## Requirements
Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue.
Expand All @@ -75,10 +75,12 @@ Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as t

### Workers and dispatchers

We have three types of processes in Solid Queue:
We have three types of actors in Solid Queue:
- _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table.
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
- The _supervisor_ forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.

By default, Solid Queue runs in `fork` mode. This means the supervisor will fork a separate process for each supervised worker/dispatcher. There's also an `async` mode where each worker and dispatcher will be run as a thread of the supervisor process. This can be used with [the provided Puma plugin](#puma-plugin)

By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:

Expand All @@ -98,7 +100,18 @@ production:
processes: 3
```

Everything is optional. If no configuration is provided, Solid Queue will run with one dispatcher and one worker with default settings.
Everything is optional. If no configuration at all is provided, Solid Queue will run with one dispatcher and one worker with default settings. If you want to run only dispatchers or workers, you just need to include that section alone in the configuration. For example, with the following configuration:

```yml
production:
dispatchers:
- polling_interval: 1
batch_size: 500
concurrency_maintenance_interval: 300
```
the supervisor will run 1 dispatcher and no workers.

Here's an overview of the different options:

- `polling_interval`: the time interval in seconds that workers and dispatchers will wait before checking for more jobs. This time defaults to `1` second for dispatchers and `0.1` seconds for workers.
- `batch_size`: the dispatcher will dispatch jobs in batches of this size. The default is 500.
Expand All @@ -118,7 +131,7 @@ Everything is optional. If no configuration is provided, Solid Queue will run wi

Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#running-as-a-fork-or-asynchronously).
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.

Expand Down Expand Up @@ -291,6 +304,18 @@ plugin :solid_queue
```
to your `puma.rb` configuration.

### Running as a fork or asynchronously

By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage.

Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so just configure the plugin as:

```ruby
plugin :solid_queue
solid_queue_mode :async
```

Note that in this case, the `processes` configuration option will be ignored.

## Jobs and transactional integrity
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you.
Expand Down
42 changes: 42 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Upgrading to version 0.4.x
This version introduced an _async_ mode to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is:

```
$ bundle exec rake solid_queue:start
```
Whether the supervisor starts workers, dispatchers or both will depend on your configuration. For example, if you don't configure any dispatchers, only workers will be started. That is, with this configuration:

```yml
production:
workers:
- queues: [ real_time, background ]
threads: 5
polling_interval: 0.1
processes: 3
```
the supervisor will run 3 workers, each one with 5 threads, and no supervisors. With this configuration:
```yml
production:
dispatchers:
- polling_interval: 1
batch_size: 500
concurrency_maintenance_interval: 300
```
the supervisor will run 1 dispatcher and no workers.


# Upgrading to version 0.3.x

This version introduced support for [recurring (cron-style) jobs](https://github.com/rails/solid_queue/blob/main/README.md#recurring-tasks), and it needs a new DB migration for it. To install it, just run:

```bash
$ bin/rails solid_queue:install:migrations
```

Or, if you're using a different database for Solid Queue:

```bash
$ bin/rails solid_queue:install:migrations DATABASE=<the_name_of_your_solid_queue_db>
```

And then run the migrations.
11 changes: 8 additions & 3 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ class << self
def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }

insert_all!(job_data)
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
block.call(claimed)
SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload|
insert_all!(job_data)
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
block.call(claimed)

payload[:size] = claimed.size
payload[:claimed_job_ids] = claimed.map(&:job_id)
end
end
end

Expand Down
19 changes: 10 additions & 9 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
# frozen_string_literal: true

class SolidQueue::Process < SolidQueue::Record
include Prunable
include Executor, Prunable

belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :forks
has_many :forks, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
has_many :claimed_executions
belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy

store :metadata, coder: JSON

after_destroy -> { claimed_executions.release_all }

def self.register(**attributes)
SolidQueue.instrument :register_process, **attributes do
create!(attributes.merge(last_heartbeat_at: Time.current))
SolidQueue.instrument :register_process, **attributes do |payload|
create!(attributes.merge(last_heartbeat_at: Time.current)).tap do |process|
payload[:process_id] = process.id
end
end
rescue Exception => error
SolidQueue.instrument :register_process, **attributes.merge(error: error)
Expand All @@ -25,7 +24,9 @@ def heartbeat
end

def deregister(pruned: false)
SolidQueue.instrument :deregister_process, process: self, pruned: pruned, claimed_size: claimed_executions.size do |payload|
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
payload[:claimed_size] = claimed_executions.size if claims_executions?

destroy!
rescue Exception => error
payload[:error] = error
Expand Down
20 changes: 20 additions & 0 deletions app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

module SolidQueue
class Process
module Executor
extend ActiveSupport::Concern

included do
has_many :claimed_executions

after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
end

private
def claims_executions?
kind == "Worker"
end
end
end
end
26 changes: 15 additions & 11 deletions app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
# frozen_string_literal: true

module SolidQueue::Process::Prunable
extend ActiveSupport::Concern
module SolidQueue
class Process
module Prunable
extend ActiveSupport::Concern

included do
scope :prunable, -> { where(last_heartbeat_at: ..SolidQueue.process_alive_threshold.ago) }
end
included do
scope :prunable, -> { where(last_heartbeat_at: ..SolidQueue.process_alive_threshold.ago) }
end

class_methods do
def prune
SolidQueue.instrument :prune_processes, size: 0 do |payload|
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
payload[:size] += batch.size
class_methods do
def prune
SolidQueue.instrument :prune_processes, size: 0 do |payload|
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
payload[:size] += batch.size

batch.each { |process| process.deregister(pruned: true) }
batch.each { |process| process.deregister(pruned: true) }
end
end
end
end
end
Expand Down
44 changes: 33 additions & 11 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,50 @@
require "puma/plugin"

module Puma
class DSL
def solid_queue_mode(mode = :fork)
@options[:solid_queue_mode] = mode.to_sym
end
end
end

Puma::Plugin.create do
attr_reader :puma_pid, :solid_queue_pid, :log_writer
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

in_background do
monitor_solid_queue
if launcher.options[:solid_queue_mode] == :async
start_async(launcher)
else
start_forked(launcher)
end
end

launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :all)
private
def start_forked(launcher)
in_background do
monitor_solid_queue
end

launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
end
def start_async(launcher)
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
launcher.events.on_stopped { solid_queue_supervisor.stop }
launcher.events.on_restart { solid_queue_supervisor.stop; solid_queue_supervisor.start }
end

private
def stop_solid_queue
Process.waitpid(solid_queue_pid, Process::WNOHANG)
log "Stopping Solid Queue..."
Expand Down
40 changes: 18 additions & 22 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,30 @@ class Configuration
recurring_tasks: []
}

def initialize(mode: :work, load_from: nil)
@mode = mode
def initialize(mode: :fork, load_from: nil)
@mode = mode.to_s.inquiry
@raw_config = config_from(load_from)
end

def processes
case mode
when :dispatch then dispatchers
when :work then workers
when :all then dispatchers + workers
else raise "Invalid mode #{mode}"
end
dispatchers + workers
end

def workers
if mode.in? %i[ work all]
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
workers_options.flat_map do |worker_options|
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
WORKER_DEFAULTS[:processes]
end
else
[]
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end

def dispatchers
if mode.in? %i[ dispatch all]
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]

Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
end

Expand All @@ -68,15 +60,19 @@ def config_from(file_or_hash, env: Rails.env)
end

def workers_options
@workers_options ||= (raw_config[:workers] || [ WORKER_DEFAULTS ])
@workers_options ||= options_from_raw_config(:workers, WORKER_DEFAULTS)
.map { |options| options.dup.symbolize_keys }
end

def dispatchers_options
@dispatchers_options ||= (raw_config[:dispatchers] || [ DISPATCHER_DEFAULTS ])
@dispatchers_options ||= options_from_raw_config(:dispatchers, DISPATCHER_DEFAULTS)
.map { |options| options.dup.symbolize_keys }
end

def options_from_raw_config(key, defaults)
raw_config.empty? ? [ defaults ] : Array(raw_config[key])
end

def parse_recurring_tasks(tasks)
Array(tasks).map do |id, options|
Dispatcher::RecurringTask.from_configuration(id, **options)
Expand Down
7 changes: 3 additions & 4 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# frozen_string_literal: true

module SolidQueue
class Dispatcher < Processes::Base
include Processes::Poller

class Dispatcher < Processes::Poller
attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule

after_boot :start_concurrency_maintenance, :load_recurring_schedule
Expand All @@ -13,10 +11,11 @@ def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)

@batch_size = options[:batch_size]
@polling_interval = options[:polling_interval]

@concurrency_maintenance = ConcurrencyMaintenance.new(options[:concurrency_maintenance_interval], options[:batch_size]) if options[:concurrency_maintenance]
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])

super(**options)
end

def metadata
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/dispatcher/concurrency_maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def start
end

def stop
@concurrency_maintenance_task.shutdown
@concurrency_maintenance_task&.shutdown
end

private
Expand Down
Loading