Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ plugin :solid_queue
```
to your `puma.rb` configuration.

### Forking or asynchronous workers

By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage.

Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so,

1. Configure the plugin as:
```ruby
plugin :solid_queue
solid_queue_mode :async
````
2. Opt-in specific worker configurations with `processes: 0` in `config/solid_queue.yml`

## Jobs and transactional integrity
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you.
Expand Down
44 changes: 34 additions & 10 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
require "puma/plugin"

module Puma
class DSL
def solid_queue_mode(mode = :fork)
@options[:solid_queue_mode] = mode.to_sym
end
end
end

Puma::Plugin.create do
attr_reader :puma_pid, :solid_queue_pid, :log_writer

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :all)
end

in_background do
monitor_solid_queue
end
if launcher.options[:solid_queue_mode] == :async
start_async(launcher)
else
start_forked(launcher)
end

launcher.events.on_stopped { stop_solid_queue }
end

private
def start_forked(launcher)
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :all)
end

in_background do
monitor_solid_queue
end
end

launcher.events.on_stopped { stop_solid_queue }
end

def start_async(launcher)
supervisor = SolidQueue::AsyncSupervisor.load
launcher.events.on_booted { supervisor.start }
launcher.events.on_stopped { supervisor.stop }
launcher.events.on_restart { supervisor.stop; supervisor.start }
end

def stop_solid_queue
Process.waitpid(solid_queue_pid, Process::WNOHANG)
log "Stopping Solid Queue..."
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "active_job/concurrency_controls"

require "solid_queue/app_executor"
require "solid_queue/async_supervisor"
require "solid_queue/processes/supervised"
require "solid_queue/processes/registrable"
require "solid_queue/processes/interruptible"
Expand Down
22 changes: 22 additions & 0 deletions lib/solid_queue/async_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module SolidQueue
class AsyncSupervisor
class << self
def load(load_configuration_from: nil)
configuration = Configuration.new(mode: :async, load_from: load_configuration_from)
new(*configuration.processes)
end
end

def initialize(*configured_processes)
@configured_processes = Array(configured_processes)
end

def start
@configured_processes.each(&:start)
end

def stop
@configured_processes.each(&:stop)
end
end
end
11 changes: 10 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def processes
when :dispatch then dispatchers
when :work then workers
when :all then dispatchers + workers
when :async then dispatchers + async_workers
else raise "Invalid mode #{mode}"
end
end
Expand All @@ -40,8 +41,16 @@ def workers
end
end

def async_workers
workers_options.select do |worker_options|
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) == 0
end.map do |worker_options|
SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS))
end
end

def dispatchers
if mode.in? %i[ dispatch all]
if mode.in? %i[ dispatch all async]
dispatchers_options.flat_map do |dispatcher_options|
SolidQueue::Dispatcher.new(**dispatcher_options)
end
Expand Down
2 changes: 2 additions & 0 deletions test/dummy/config/puma.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@
# Allow puma to be restarted by `bin/rails restart` command.
plugin :tmp_restart
plugin :solid_queue

solid_queue_mode :async
3 changes: 3 additions & 0 deletions test/dummy/config/solid_queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ default: &default
threads: 3
- queues: default
threads: 5
- queues: [default, background]
threads: 2
processes: 0 # async
dispatchers:
- polling_interval: 1
batch_size: 500
Expand Down