Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4ea040f
Pass default options for each dispatcher
rosa Jan 4, 2024
f0c1134
Extract concurrency maintenance work to its own class and allow optin…
rosa Jan 4, 2024
16966b6
Add Fugit to parse cron-style schedules
rosa Dec 5, 2023
e554e7e
Refactor and simplify process callbacks setup
rosa Feb 16, 2024
d453743
Remove initial jitter for dispatcher
rosa Feb 16, 2024
c383d15
Use Zeitwerk as autoloader instead of explicit requires
rosa Feb 16, 2024
d871c27
Move #preserve_finished_jobs? to SolidQueue module for consistency
rosa Feb 16, 2024
5e82dbf
Move ConcurrencyClerk under Dispatcher namespace
rosa Feb 16, 2024
f5e63a1
Refactor a bit the concurrency maintenance and stub recurring tasks
rosa Feb 16, 2024
676383f
Implement basic recurring task parsing and loading in a schedule
rosa Feb 16, 2024
efadb87
Rename RecurringTask#id to RecurringTask#key
rosa Feb 18, 2024
e7dd9d3
Create recurring_executions table
rosa Feb 18, 2024
8726b39
Fix configuration of recurring tasks when parsed via Configuration
rosa Feb 18, 2024
3e4c0f0
Track recurring executions to prevent enqueuing the same task more th…
rosa Feb 18, 2024
b2f74a3
Log thread errors in development, just like in test
rosa Feb 18, 2024
b64178a
Support passing kwargs as last element of the arguments array
rosa Feb 19, 2024
ab5e504
Remove or change some logging from info to debug level
rosa Feb 22, 2024
a28ccda
Store full recurring task configuration in process metadata
rosa Feb 26, 2024
6d510a5
Destroy recurring execution when deleting a job
rosa Feb 27, 2024
1043984
Add a clear_in_batches method to delete recurring executions
rosa Feb 27, 2024
e104a5f
Fix indentation in Process::Runnable
rosa Mar 14, 2024
8d1e27c
Refactor a bit the Poller vs. Runnable modules
rosa Mar 14, 2024
38db619
Group all development dependencies together in solid_queue.gemspec
rosa Mar 20, 2024
4ea0e2a
Add documentation about recurring tasks in README
rosa Mar 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,3 @@ git_source(:github) { |repo| "https://github.com/#{repo}.git" }

# Specify your gem's dependencies in solid_queue.gemspec.
gemspec

gem "mysql2"
gem "pg"
gem "sqlite3"
8 changes: 8 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ PATH
solid_queue (0.2.2)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (~> 1.2.2)
fugit (~> 1.9.0)
railties (>= 7.1)

GEM
Expand Down Expand Up @@ -55,6 +57,11 @@ GEM
drb (2.1.1)
ruby2_keywords
erubi (1.12.0)
et-orbi (1.2.7)
tzinfo
fugit (1.9.0)
et-orbi (~> 1, >= 1.2.7)
raabro (~> 1.4)
globalid (1.2.1)
activesupport (>= 6.1)
i18n (1.14.1)
Expand All @@ -81,6 +88,7 @@ GEM
pg (1.5.4)
puma (6.4.2)
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.7.3)
rack (3.0.8)
rack-session (2.0.0)
Expand Down
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind.

Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). _Improvements to logging and instrumentation, a better CLI tool, a way to run within an existing process in "async" mode, unique jobs and recurring, cron-like tasks are coming very soon._
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). _Improvements to logging and instrumentation, a better CLI tool, a way to run within an existing process in "async" mode, and some way of specifying unique jobs are coming very soon._

Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails multi-threading.

Expand Down Expand Up @@ -77,7 +77,7 @@ Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as t

We have three types of processes in Solid Queue:
- _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table.
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They also do some maintenance work related to concurrency controls.
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
- The _supervisor_ forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.

By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:
Expand Down Expand Up @@ -119,6 +119,8 @@ Everything is optional. If no configuration is provided, Solid Queue will run wi
Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.


### Queue order and priorities
Expand Down Expand Up @@ -265,3 +267,48 @@ Solid Queue has been inspired by [resque](https://github.com/resque/resque) and

## License
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be at the end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, @brunoprietog thanks for spotting this! It should totally be at the end 😆

The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).

## Recurring tasks
Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by dispatcher processes and as such, they can be defined in the dispatcher's configuration like this:
```yml
dispatchers:
- polling_interval: 1
batch_size: 500
recurring_tasks:
my_periodic_job:
class: MyJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
```
`recurring_tasks` is a hash/dictionary, and the key will be the task key internally. Each task needs to have a class, which will be the job class to enqueue, and a schedule. The schedule is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can also provide arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array.

The job in the example configuration above will be enqueued every second as:
```ruby
MyJob.perform_later(42, status: "custom_status")
```

Tasks are enqueued at their corresponding times by the dispatcher that owns them, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb).

It's possible to run multiple dispatchers with the same `recurring_tasks` configuration. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.

Finally, it's possible to configure jobs that aren't handled by Solid Queue. That's it, you can a have a job like this in your app:
```ruby
class MyResqueJob < ApplicationJob
self.queue_adapter = :resque

def perform(arg)
# ..
end
end
```

You can still configure this in Solid Queue:
```yml
dispatchers:
- recurring_tasks:
my_periodic_resque_job:
class: MyResqueJob
args: 22
schedule: "*/5 * * * *"
```
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
2 changes: 1 addition & 1 deletion app/models/solid_queue/blocked_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def release
promote_to_ready
destroy!

SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
end
end
end
Expand Down
1 change: 0 additions & 1 deletion app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def claiming(job_ids, process_id, &block)
insert_all!(job_data)
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SolidQueue
class Job < Record
include Executable
include Executable, Clearable, Recurrable

serialize :arguments, coder: JSON

Expand Down
8 changes: 2 additions & 6 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Executable
extend ActiveSupport::Concern

included do
include Clearable, ConcurrencyControls, Schedulable
include ConcurrencyControls, Schedulable

has_one :ready_execution
has_one :claimed_execution
Expand Down Expand Up @@ -78,7 +78,7 @@ def dispatch_bypassing_concurrency_limits
end

def finished!
if preserve_finished_jobs?
if SolidQueue.preserve_finished_jobs?
touch(:finished_at)
else
destroy!
Expand Down Expand Up @@ -117,10 +117,6 @@ def ready
def execution
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
end

def preserve_finished_jobs?
SolidQueue.preserve_finished_jobs
end
end
end
end
13 changes: 13 additions & 0 deletions app/models/solid_queue/job/recurrable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

module SolidQueue
class Job
module Recurrable
extend ActiveSupport::Concern

included do
has_one :recurring_execution, dependent: :destroy
end
end
end
end
26 changes: 26 additions & 0 deletions app/models/solid_queue/recurring_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module SolidQueue
class RecurringExecution < Execution
scope :clearable, -> { where.missing(:job) }

class << self
def record(task_key, run_at, &block)
transaction do
if job_id = block.call
create!(job_id: job_id, task_key: task_key, run_at: run_at)
end
end
rescue ActiveRecord::RecordNotUnique
SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at} — already dispatched")
end

def clear_in_batches(batch_size: 500)
loop do
records_deleted = clearable.limit(batch_size).delete_all
break if records_deleted == 0
end
end
end
end
end
14 changes: 14 additions & 0 deletions db/migrate/20240218110712_create_recurring_executions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class CreateRecurringExecutions < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_recurring_executions do |t|
t.references :job, index: { unique: true }, null: false
t.string :task_key, null: false
t.datetime :run_at, null: false
t.datetime :created_at, null: false

t.index [ :task_key, :run_at ], unique: true
end

add_foreign_key :solid_queue_recurring_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade
end
end
44 changes: 21 additions & 23 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,16 @@
require "solid_queue/version"
require "solid_queue/engine"

require "active_job/queue_adapters/solid_queue_adapter"
require "active_job/concurrency_controls"

require "solid_queue/app_executor"
require "solid_queue/processes/supervised"
require "solid_queue/processes/registrable"
require "solid_queue/processes/interruptible"
require "solid_queue/processes/pidfile"
require "solid_queue/processes/procline"
require "solid_queue/processes/poller"
require "solid_queue/processes/base"
require "solid_queue/processes/runnable"
require "solid_queue/processes/signals"
require "solid_queue/configuration"
require "solid_queue/pool"
require "solid_queue/worker"
require "solid_queue/dispatcher"
require "solid_queue/supervisor"
require "active_job"
require "active_job/queue_adapters"

require "zeitwerk"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


loader = Zeitwerk::Loader.for_gem(warn_on_extra_files: false)
loader.ignore("#{__dir__}/solid_queue/tasks.rb")
loader.ignore("#{__dir__}/generators")
loader.ignore("#{__dir__}/puma")
loader.setup

module SolidQueue
mattr_accessor :logger, default: ActiveSupport::Logger.new($stdout)
Expand All @@ -42,11 +34,17 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

def self.supervisor?
supervisor
end
class << self
def supervisor?
supervisor
end

def silence_polling?
silence_polling
end

def self.silence_polling?
silence_polling
def preserve_finished_jobs?
preserve_finished_jobs
end
end
end
17 changes: 13 additions & 4 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ class Configuration
DISPATCHER_DEFAULTS = {
batch_size: 500,
polling_interval: 1,
concurrency_maintenance_interval: 600
concurrency_maintenance: true,
concurrency_maintenance_interval: 600,
recurring_tasks: []
}

def initialize(mode: :work, load_from: nil)
Expand All @@ -33,7 +35,7 @@ def workers
if mode.in? %i[ work all]
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
end
else
[]
Expand All @@ -42,8 +44,10 @@ def workers

def dispatchers
if mode.in? %i[ dispatch all]
dispatchers_options.flat_map do |dispatcher_options|
SolidQueue::Dispatcher.new(**dispatcher_options)
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]

Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
end
end
Expand Down Expand Up @@ -73,6 +77,11 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def parse_recurring_tasks(tasks)
Array(tasks).map do |id, options|
Dispatcher::RecurringTask.from_configuration(id, **options)
end.select(&:valid?)
end

def load_config_from(file_or_hash)
case file_or_hash
Expand Down
Loading