diff --git a/README.md b/README.md index db03e4e9..5cba2607 100644 --- a/README.md +++ b/README.md @@ -238,6 +238,18 @@ plugin :solid_queue ``` to your `puma.rb` configuration. +### Forking or asynchronous workers + +By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage. + +Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so, + +1. Configure the plugin as: + ```ruby + plugin :solid_queue + solid_queue_mode :async + ```` +2. Opt-in specific worker configurations with `processes: 0` in `config/solid_queue.yml` ## Jobs and transactional integrity :warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 4a4d0d54..edd912d0 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -1,5 +1,13 @@ require "puma/plugin" +module Puma + class DSL + def solid_queue_mode(mode = :fork) + @options[:solid_queue_mode] = mode.to_sym + end + end +end + Puma::Plugin.create do attr_reader :puma_pid, :solid_queue_pid, :log_writer @@ -7,21 +15,37 @@ def start(launcher) @log_writer = launcher.log_writer @puma_pid = $$ - launcher.events.on_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start(mode: :all) - end - in_background do - monitor_solid_queue - end + if launcher.options[:solid_queue_mode] == :async + start_async(launcher) + else + start_forked(launcher) end - - launcher.events.on_stopped { stop_solid_queue } end private + def start_forked(launcher) + launcher.events.on_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :all) + end + + in_background do + monitor_solid_queue + end + end + + launcher.events.on_stopped { stop_solid_queue } + end + + def start_async(launcher) + supervisor = SolidQueue::AsyncSupervisor.load + launcher.events.on_booted { supervisor.start } + launcher.events.on_stopped { supervisor.stop } + launcher.events.on_restart { supervisor.stop; supervisor.start } + end + def stop_solid_queue Process.waitpid(solid_queue_pid, Process::WNOHANG) log "Stopping Solid Queue..." diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 69243e84..b72b99c7 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -7,6 +7,7 @@ require "active_job/concurrency_controls" require "solid_queue/app_executor" +require "solid_queue/async_supervisor" require "solid_queue/processes/supervised" require "solid_queue/processes/registrable" require "solid_queue/processes/interruptible" diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb new file mode 100644 index 00000000..39879a3b --- /dev/null +++ b/lib/solid_queue/async_supervisor.rb @@ -0,0 +1,22 @@ +module SolidQueue + class AsyncSupervisor + class << self + def load(load_configuration_from: nil) + configuration = Configuration.new(mode: :async, load_from: load_configuration_from) + new(*configuration.processes) + end + end + + def initialize(*configured_processes) + @configured_processes = Array(configured_processes) + end + + def start + @configured_processes.each(&:start) + end + + def stop + @configured_processes.each(&:stop) + end + end +end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 41bd64e1..8232b407 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -25,6 +25,7 @@ def processes when :dispatch then dispatchers when :work then workers when :all then dispatchers + workers + when :async then dispatchers + async_workers else raise "Invalid mode #{mode}" end end @@ -40,8 +41,16 @@ def workers end end + def async_workers + workers_options.select do |worker_options| + worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) == 0 + end.map do |worker_options| + SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) + end + end + def dispatchers - if mode.in? %i[ dispatch all] + if mode.in? %i[ dispatch all async] dispatchers_options.flat_map do |dispatcher_options| SolidQueue::Dispatcher.new(**dispatcher_options) end diff --git a/test/dummy/config/puma.rb b/test/dummy/config/puma.rb index d4f1ae10..beb65259 100644 --- a/test/dummy/config/puma.rb +++ b/test/dummy/config/puma.rb @@ -42,3 +42,5 @@ # Allow puma to be restarted by `bin/rails restart` command. plugin :tmp_restart plugin :solid_queue + +solid_queue_mode :async diff --git a/test/dummy/config/solid_queue.yml b/test/dummy/config/solid_queue.yml index cce3e9cd..d673316e 100644 --- a/test/dummy/config/solid_queue.yml +++ b/test/dummy/config/solid_queue.yml @@ -4,6 +4,9 @@ default: &default threads: 3 - queues: default threads: 5 + - queues: [default, background] + threads: 2 + processes: 0 # async dispatchers: - polling_interval: 1 batch_size: 500