diff --git a/Gemfile b/Gemfile index 0461dab1..5f929b7b 100644 --- a/Gemfile +++ b/Gemfile @@ -3,7 +3,3 @@ git_source(:github) { |repo| "https://github.com/#{repo}.git" } # Specify your gem's dependencies in solid_queue.gemspec. gemspec - -gem "mysql2" -gem "pg" -gem "sqlite3" diff --git a/Gemfile.lock b/Gemfile.lock index e83b38c0..c060fb77 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,8 @@ PATH solid_queue (0.2.2) activejob (>= 7.1) activerecord (>= 7.1) + concurrent-ruby (~> 1.2.2) + fugit (~> 1.9.0) railties (>= 7.1) GEM @@ -55,6 +57,11 @@ GEM drb (2.1.1) ruby2_keywords erubi (1.12.0) + et-orbi (1.2.7) + tzinfo + fugit (1.9.0) + et-orbi (~> 1, >= 1.2.7) + raabro (~> 1.4) globalid (1.2.1) activesupport (>= 6.1) i18n (1.14.1) @@ -81,6 +88,7 @@ GEM pg (1.5.4) puma (6.4.2) nio4r (~> 2.0) + raabro (1.4.0) racc (1.7.3) rack (3.0.8) rack-session (2.0.0) diff --git a/README.md b/README.md index 9726bb9c..4446d286 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind. -Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). _Improvements to logging and instrumentation, a better CLI tool, a way to run within an existing process in "async" mode, unique jobs and recurring, cron-like tasks are coming very soon._ +Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). _Improvements to logging and instrumentation, a better CLI tool, a way to run within an existing process in "async" mode, and some way of specifying unique jobs are coming very soon._ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails multi-threading. @@ -77,7 +77,7 @@ Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as t We have three types of processes in Solid Queue: - _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table. -- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They also do some maintenance work related to concurrency controls. +- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls). - The _supervisor_ forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed. By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like: @@ -119,6 +119,8 @@ Everything is optional. If no configuration is provided, Solid Queue will run wi Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names. - `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. +- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. +- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section. ### Queue order and priorities @@ -265,3 +267,48 @@ Solid Queue has been inspired by [resque](https://github.com/resque/resque) and ## License The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT). + +## Recurring tasks +Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by dispatcher processes and as such, they can be defined in the dispatcher's configuration like this: +```yml + dispatchers: + - polling_interval: 1 + batch_size: 500 + recurring_tasks: + my_periodic_job: + class: MyJob + args: [ 42, { status: "custom_status" } ] + schedule: every second +``` +`recurring_tasks` is a hash/dictionary, and the key will be the task key internally. Each task needs to have a class, which will be the job class to enqueue, and a schedule. The schedule is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can also provide arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array. + +The job in the example configuration above will be enqueued every second as: +```ruby +MyJob.perform_later(42, status: "custom_status") +``` + +Tasks are enqueued at their corresponding times by the dispatcher that owns them, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb). + +It's possible to run multiple dispatchers with the same `recurring_tasks` configuration. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around. + +Finally, it's possible to configure jobs that aren't handled by Solid Queue. That's it, you can a have a job like this in your app: +```ruby +class MyResqueJob < ApplicationJob + self.queue_adapter = :resque + + def perform(arg) + # .. + end +end +``` + +You can still configure this in Solid Queue: +```yml + dispatchers: + - recurring_tasks: + my_periodic_resque_job: + class: MyResqueJob + args: 22 + schedule: "*/5 * * * *" +``` +and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index df41df07..a6f00e84 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -43,7 +43,7 @@ def release promote_to_ready destroy! - SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}") + SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}") end end end diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 20096031..c8cd95cc 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -16,7 +16,6 @@ def claiming(job_ids, process_id, &block) insert_all!(job_data) where(job_id: job_ids, process_id: process_id).load.tap do |claimed| block.call(claimed) - SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs") end end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index e06fb06a..304861d3 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -2,7 +2,7 @@ module SolidQueue class Job < Record - include Executable + include Executable, Clearable, Recurrable serialize :arguments, coder: JSON diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 9ba720d2..fd046fda 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -6,7 +6,7 @@ module Executable extend ActiveSupport::Concern included do - include Clearable, ConcurrencyControls, Schedulable + include ConcurrencyControls, Schedulable has_one :ready_execution has_one :claimed_execution @@ -78,7 +78,7 @@ def dispatch_bypassing_concurrency_limits end def finished! - if preserve_finished_jobs? + if SolidQueue.preserve_finished_jobs? touch(:finished_at) else destroy! @@ -117,10 +117,6 @@ def ready def execution %w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") } end - - def preserve_finished_jobs? - SolidQueue.preserve_finished_jobs - end end end end diff --git a/app/models/solid_queue/job/recurrable.rb b/app/models/solid_queue/job/recurrable.rb new file mode 100644 index 00000000..6d70dc44 --- /dev/null +++ b/app/models/solid_queue/job/recurrable.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module SolidQueue + class Job + module Recurrable + extend ActiveSupport::Concern + + included do + has_one :recurring_execution, dependent: :destroy + end + end + end +end diff --git a/app/models/solid_queue/recurring_execution.rb b/app/models/solid_queue/recurring_execution.rb new file mode 100644 index 00000000..02365168 --- /dev/null +++ b/app/models/solid_queue/recurring_execution.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module SolidQueue + class RecurringExecution < Execution + scope :clearable, -> { where.missing(:job) } + + class << self + def record(task_key, run_at, &block) + transaction do + if job_id = block.call + create!(job_id: job_id, task_key: task_key, run_at: run_at) + end + end + rescue ActiveRecord::RecordNotUnique + SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at} — already dispatched") + end + + def clear_in_batches(batch_size: 500) + loop do + records_deleted = clearable.limit(batch_size).delete_all + break if records_deleted == 0 + end + end + end + end +end diff --git a/db/migrate/20240218110712_create_recurring_executions.rb b/db/migrate/20240218110712_create_recurring_executions.rb new file mode 100644 index 00000000..87c2ac64 --- /dev/null +++ b/db/migrate/20240218110712_create_recurring_executions.rb @@ -0,0 +1,14 @@ +class CreateRecurringExecutions < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_recurring_executions do |t| + t.references :job, index: { unique: true }, null: false + t.string :task_key, null: false + t.datetime :run_at, null: false + t.datetime :created_at, null: false + + t.index [ :task_key, :run_at ], unique: true + end + + add_foreign_key :solid_queue_recurring_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 27a294d2..33feeb07 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -3,24 +3,16 @@ require "solid_queue/version" require "solid_queue/engine" -require "active_job/queue_adapters/solid_queue_adapter" -require "active_job/concurrency_controls" - -require "solid_queue/app_executor" -require "solid_queue/processes/supervised" -require "solid_queue/processes/registrable" -require "solid_queue/processes/interruptible" -require "solid_queue/processes/pidfile" -require "solid_queue/processes/procline" -require "solid_queue/processes/poller" -require "solid_queue/processes/base" -require "solid_queue/processes/runnable" -require "solid_queue/processes/signals" -require "solid_queue/configuration" -require "solid_queue/pool" -require "solid_queue/worker" -require "solid_queue/dispatcher" -require "solid_queue/supervisor" +require "active_job" +require "active_job/queue_adapters" + +require "zeitwerk" + +loader = Zeitwerk::Loader.for_gem(warn_on_extra_files: false) +loader.ignore("#{__dir__}/solid_queue/tasks.rb") +loader.ignore("#{__dir__}/generators") +loader.ignore("#{__dir__}/puma") +loader.setup module SolidQueue mattr_accessor :logger, default: ActiveSupport::Logger.new($stdout) @@ -42,11 +34,17 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes - def self.supervisor? - supervisor - end + class << self + def supervisor? + supervisor + end + + def silence_polling? + silence_polling + end - def self.silence_polling? - silence_polling + def preserve_finished_jobs? + preserve_finished_jobs + end end end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 1206ebb5..a6006024 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -12,7 +12,9 @@ class Configuration DISPATCHER_DEFAULTS = { batch_size: 500, polling_interval: 1, - concurrency_maintenance_interval: 600 + concurrency_maintenance: true, + concurrency_maintenance_interval: 600, + recurring_tasks: [] } def initialize(mode: :work, load_from: nil) @@ -33,7 +35,7 @@ def workers if mode.in? %i[ work all] workers_options.flat_map do |worker_options| processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) - processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) } + processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) } end else [] @@ -42,8 +44,10 @@ def workers def dispatchers if mode.in? %i[ dispatch all] - dispatchers_options.flat_map do |dispatcher_options| - SolidQueue::Dispatcher.new(**dispatcher_options) + dispatchers_options.map do |dispatcher_options| + recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks] + + Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS) end end end @@ -73,6 +77,11 @@ def dispatchers_options .map { |options| options.dup.symbolize_keys } end + def parse_recurring_tasks(tasks) + Array(tasks).map do |id, options| + Dispatcher::RecurringTask.from_configuration(id, **options) + end.select(&:valid?) + end def load_config_from(file_or_hash) case file_or_hash diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index b52370fa..994be6aa 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -2,72 +2,57 @@ module SolidQueue class Dispatcher < Processes::Base - include Processes::Runnable, Processes::Poller + include Processes::Poller - attr_accessor :batch_size, :concurrency_maintenance_interval + attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule - set_callback :boot, :after, :launch_concurrency_maintenance - set_callback :shutdown, :before, :stop_concurrency_maintenance + after_boot :start_concurrency_maintenance, :load_recurring_schedule + before_shutdown :stop_concurrency_maintenance, :unload_recurring_schedule def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) @batch_size = options[:batch_size] @polling_interval = options[:polling_interval] - @concurrency_maintenance_interval = options[:concurrency_maintenance_interval] + + @concurrency_maintenance = ConcurrencyMaintenance.new(options[:concurrency_maintenance_interval], options[:batch_size]) if options[:concurrency_maintenance] + @recurring_schedule = RecurringSchedule.new(options[:recurring_tasks]) end private - def run + def poll batch = dispatch_next_batch - - unless batch.size > 0 - procline "waiting" - interruptible_sleep(polling_interval) - end + batch.size end def dispatch_next_batch with_polling_volume do - SolidQueue::ScheduledExecution.dispatch_next_batch(batch_size) + ScheduledExecution.dispatch_next_batch(batch_size) end end - def launch_concurrency_maintenance - @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do - expire_semaphores - unblock_blocked_executions - end - - @concurrency_maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @concurrency_maintenance_task.execute + def start_concurrency_maintenance + concurrency_maintenance&.start end - def stop_concurrency_maintenance - @concurrency_maintenance_task.shutdown + def load_recurring_schedule + recurring_schedule.load_tasks end - def expire_semaphores - wrap_in_app_executor do - Semaphore.expired.in_batches(of: batch_size, &:delete_all) - end + def stop_concurrency_maintenance + concurrency_maintenance&.stop end - def unblock_blocked_executions - wrap_in_app_executor do - BlockedExecution.unblock(batch_size) - end + def unload_recurring_schedule + recurring_schedule.unload_tasks end - def initial_jitter - Kernel.rand(0...polling_interval) + def set_procline + procline "waiting" end def metadata - super.merge(batch_size: batch_size) + super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence ) end end end diff --git a/lib/solid_queue/dispatcher/concurrency_maintenance.rb b/lib/solid_queue/dispatcher/concurrency_maintenance.rb new file mode 100644 index 00000000..846cc9bf --- /dev/null +++ b/lib/solid_queue/dispatcher/concurrency_maintenance.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module SolidQueue + class Dispatcher::ConcurrencyMaintenance + include AppExecutor + + attr_reader :interval, :batch_size + + def initialize(interval, batch_size) + @interval = interval + @batch_size = batch_size + end + + def start + @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do + expire_semaphores + unblock_blocked_executions + end + + @concurrency_maintenance_task.add_observer do |_, _, error| + handle_thread_error(error) if error + end + + @concurrency_maintenance_task.execute + end + + def stop + @concurrency_maintenance_task.shutdown + end + + private + def expire_semaphores + wrap_in_app_executor do + Semaphore.expired.in_batches(of: batch_size, &:delete_all) + end + end + + def unblock_blocked_executions + wrap_in_app_executor do + BlockedExecution.unblock(batch_size) + end + end + end +end diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/dispatcher/recurring_schedule.rb new file mode 100644 index 00000000..33ae2deb --- /dev/null +++ b/lib/solid_queue/dispatcher/recurring_schedule.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +module SolidQueue + class Dispatcher::RecurringSchedule + include AppExecutor + + attr_reader :configured_tasks, :scheduled_tasks + + def initialize(tasks) + @configured_tasks = Array(tasks).map { |task| Dispatcher::RecurringTask.wrap(task) } + @scheduled_tasks = Concurrent::Hash.new + end + + def load_tasks + configured_tasks.each do |task| + load_task(task) + end + end + + def load_task(task) + scheduled_tasks[task.key] = schedule(task) + end + + def unload_tasks + scheduled_tasks.values.each(&:cancel) + scheduled_tasks.clear + end + + def tasks + configured_tasks.each_with_object({}) { |task, hsh| hsh[task.key] = task.to_h } + end + + def inspect + configured_tasks.map(&:to_s).join(" | ") + end + + private + def schedule(task) + scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at| + thread_schedule.load_task(thread_task) + + wrap_in_app_executor do + thread_task.enqueue(at: thread_task_run_at) + end + end + + scheduled_task.add_observer do |_, _, error| + # Don't notify on task cancellation before execution, as this will happen normally + # as part of unloading tasks + handle_thread_error(error) if error && !error.is_a?(Concurrent::CancelledOperationError) + end + + scheduled_task.tap(&:execute) + end + end +end diff --git a/lib/solid_queue/dispatcher/recurring_task.rb b/lib/solid_queue/dispatcher/recurring_task.rb new file mode 100644 index 00000000..2f26207f --- /dev/null +++ b/lib/solid_queue/dispatcher/recurring_task.rb @@ -0,0 +1,85 @@ +require "fugit" + +module SolidQueue + class Dispatcher::RecurringTask + class << self + def wrap(args) + args.is_a?(self) ? args : from_configuration(args.first, **args.second) + end + + def from_configuration(key, **options) + new(key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) + end + end + + attr_reader :key, :schedule, :class_name, :arguments + + def initialize(key, class_name:, schedule:, arguments: nil) + @key = key + @class_name = class_name + @schedule = schedule + @arguments = Array(arguments) + end + + def delay_from_now + [ (next_time - Time.current).to_f, 0 ].max + end + + def next_time + parsed_schedule.next_time.utc + end + + def enqueue(at:) + if using_solid_queue_adapter? + perform_later_and_record(run_at: at) + else + perform_later + end + end + + def valid? + parsed_schedule.instance_of?(Fugit::Cron) + end + + def to_s + "#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original.to_s} ]" + end + + def to_h + { + schedule: schedule, + class_name: class_name, + arguments: arguments + } + end + + private + def using_solid_queue_adapter? + job_class.queue_adapter_name.inquiry.solid_queue? + end + + def perform_later_and_record(run_at:) + RecurringExecution.record(key, run_at) { perform_later.provider_job_id } + end + + def perform_later + job_class.perform_later(*arguments_with_kwargs) + end + + def arguments_with_kwargs + if arguments.last.is_a?(Hash) + arguments[0...-1] + [ Hash.ruby2_keywords_hash(arguments.last) ] + else + arguments + end + end + + def parsed_schedule + @parsed_schedule ||= Fugit.parse(schedule) + end + + def job_class + @job_class ||= class_name.safe_constantize + end + end +end \ No newline at end of file diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index 3f56674a..825ce801 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -3,25 +3,8 @@ module SolidQueue module Processes class Base - include ActiveSupport::Callbacks - define_callbacks :boot, :shutdown - + include Callbacks # Defines callbacks needed by other concerns include AppExecutor, Registrable, Interruptible, Procline - - private - def observe_initial_delay - interruptible_sleep(initial_jitter) - end - - def boot - end - - def shutdown - end - - def initial_jitter - 0 - end end end end diff --git a/lib/solid_queue/processes/callbacks.rb b/lib/solid_queue/processes/callbacks.rb new file mode 100644 index 00000000..eb885c75 --- /dev/null +++ b/lib/solid_queue/processes/callbacks.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module SolidQueue::Processes + module Callbacks + extend ActiveSupport::Concern + + included do + extend ActiveModel::Callbacks + define_model_callbacks :boot, :shutdown + end + + private + def boot + end + + def shutdown + end + end +end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index aa772807..d27274ef 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -4,11 +4,39 @@ module SolidQueue::Processes module Poller extend ActiveSupport::Concern + include Runnable + included do attr_accessor :polling_interval end private + def run + if mode.async? + @thread = Thread.new { start_loop } + else + start_loop + end + end + + def start_loop + loop do + break if shutting_down? + + wrap_in_app_executor do + unless poll > 0 + interruptible_sleep(polling_interval) + end + end + end + ensure + run_callbacks(:shutdown) { shutdown } + end + + def poll + raise NotImplementedError + end + def with_polling_volume if SolidQueue.silence_polling? ActiveRecord::Base.logger.silence { yield } diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 39541743..ffeac929 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -5,11 +5,10 @@ module Registrable extend ActiveSupport::Concern included do - set_callback :boot, :after, :register - set_callback :boot, :after, :launch_heartbeat + after_boot :register, :launch_heartbeat - set_callback :shutdown, :before, :stop_heartbeat - set_callback :shutdown, :after, :deregister + before_shutdown :stop_heartbeat + after_shutdown :deregister end def inspect @@ -26,7 +25,7 @@ def register pid: process_pid, hostname: hostname, supervisor: try(:supervisor), - metadata: metadata + metadata: metadata.compact end def deregister diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 7b267ff9..da411d46 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -8,11 +8,9 @@ module Runnable def start @stopping = false - - observe_initial_delay run_callbacks(:boot) { boot } - start_loop + run end def stop @@ -20,60 +18,47 @@ def stop @thread&.join end - private - DEFAULT_MODE = :async + private + DEFAULT_MODE = :async - def mode - (@mode || DEFAULT_MODE).to_s.inquiry - end + def mode + (@mode || DEFAULT_MODE).to_s.inquiry + end - def boot - register_signal_handlers if supervised? - SolidQueue.logger.info("[SolidQueue] Starting #{self}") - end + def boot + if supervised? + register_signal_handlers + set_procline + end - def start_loop - if mode.async? - @thread = Thread.new { do_start_loop } - else - do_start_loop + SolidQueue.logger.info("[SolidQueue] Starting #{self}") end - end - def do_start_loop - loop do - break if shutting_down? - - wrap_in_app_executor do - run - end + def shutting_down? + stopping? || supervisor_went_away? || finished? end - ensure - run_callbacks(:shutdown) { shutdown } - end - def shutting_down? - stopping? || supervisor_went_away? || finished? - end + def run + raise NotImplementedError + end - def run - raise NotImplementedError - end + def stopping? + @stopping + end - def stopping? - @stopping - end + def finished? + running_inline? && all_work_completed? + end - def finished? - running_inline? && all_work_completed? - end + def all_work_completed? + false + end - def all_work_completed? - false - end + def set_procline + end - def running_inline? - mode.inline? - end + def running_inline? + mode.inline? + end end end diff --git a/lib/solid_queue/processes/supervised.rb b/lib/solid_queue/processes/supervised.rb index 0d37d5e5..395da428 100644 --- a/lib/solid_queue/processes/supervised.rb +++ b/lib/solid_queue/processes/supervised.rb @@ -14,6 +14,10 @@ def supervised_by(process) end private + def set_procline + procline "waiting" + end + def supervisor_went_away? supervised? && supervisor&.pid != ::Process.ppid end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 35a8218f..6b3e029e 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -4,7 +4,7 @@ module SolidQueue class Supervisor < Processes::Base include Processes::Signals - set_callback :boot, :after, :launch_process_prune + after_boot :launch_process_prune class << self def start(mode: :work, load_configuration_from: nil) diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 2b3f0a79..88265b3d 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -2,7 +2,7 @@ module SolidQueue class Worker < Processes::Base - include Processes::Runnable, Processes::Poller + include Processes::Poller attr_accessor :queues, :pool @@ -15,22 +15,17 @@ def initialize(**options) end private - def run - polled_executions = poll - - if polled_executions.size > 0 - procline "performing #{polled_executions.count} jobs" - - polled_executions.each do |execution| + def poll + claim_executions.then do |executions| + executions.each do |execution| pool.post(execution) end - else - procline "waiting for jobs in #{queues.join(",")}" - interruptible_sleep(polling_interval) + + executions.size end end - def poll + def claim_executions with_polling_volume do SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) end @@ -47,6 +42,10 @@ def all_work_completed? SolidQueue::ReadyExecution.aggregated_count_across(queues).zero? end + def set_procline + procline "waiting for jobs in #{queues.join(",")}" + end + def metadata super.merge(queues: queues.join(","), thread_pool_size: pool.size) end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index c752ae40..a4e90c10 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -21,7 +21,12 @@ Gem::Specification.new do |spec| spec.add_dependency "activerecord", rails_version spec.add_dependency "activejob", rails_version spec.add_dependency "railties", rails_version + spec.add_dependency "concurrent-ruby", "~> 1.2.2" + spec.add_dependency "fugit", "~> 1.9.0" spec.add_development_dependency "debug" spec.add_development_dependency "mocha" spec.add_development_dependency "puma" + spec.add_development_dependency "mysql2" + spec.add_development_dependency "pg" + spec.add_development_dependency "sqlite3" end diff --git a/test/dummy/app/jobs/store_result_job.rb b/test/dummy/app/jobs/store_result_job.rb index c6ad5e7e..ab5edff6 100644 --- a/test/dummy/app/jobs/store_result_job.rb +++ b/test/dummy/app/jobs/store_result_job.rb @@ -1,13 +1,13 @@ class StoreResultJob < ApplicationJob queue_as :background - def perform(value, pause: nil, exception: nil, exit: nil) + def perform(value, status: :completed, pause: nil, exception: nil, exit: nil) result = JobResult.create!(queue_name: queue_name, status: "started", value: value) sleep(pause) if pause raise exception.new if exception exit! if exit - result.update!(status: "completed") + result.update!(status: status) end end diff --git a/test/dummy/config/environments/development.rb b/test/dummy/config/environments/development.rb index e3714846..4d9f1d46 100644 --- a/test/dummy/config/environments/development.rb +++ b/test/dummy/config/environments/development.rb @@ -58,4 +58,7 @@ # config.action_cable.disable_request_forgery_protection = true config.solid_queue.logger = ActiveSupport::Logger.new(nil) + + logger = ActiveSupport::Logger.new(STDOUT) + config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{(exception.backtrace || caller)&.join("\n")}") } end diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index cdf9ddba..33240b02 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -49,7 +49,7 @@ # config.action_view.annotate_rendered_view_with_filenames = true logger = ActiveSupport::Logger.new(STDOUT) - config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") } + config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{(exception.backtrace || caller)&.join("\n")}") } config.solid_queue.logger = ActiveSupport::Logger.new(nil) config.solid_queue.shutdown_timeout = 2.seconds diff --git a/test/dummy/config/solid_queue.yml b/test/dummy/config/solid_queue.yml index cce3e9cd..efa9a9e0 100644 --- a/test/dummy/config/solid_queue.yml +++ b/test/dummy/config/solid_queue.yml @@ -7,6 +7,11 @@ default: &default dispatchers: - polling_interval: 1 batch_size: 500 + recurring_tasks: + periodic_store_result: + class: StoreResultJob + args: [ 42, { status: "custom_status" } ] + schedule: every second development: <<: *default diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index c60d200c..52c648f3 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_01_10_143450) do +ActiveRecord::Schema[7.1].define(version: 2024_02_18_110712) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -92,6 +92,15 @@ t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" end + create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false @@ -117,5 +126,6 @@ add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade end diff --git a/test/fixtures/solid_queue/recurring_executions.yml b/test/fixtures/solid_queue/recurring_executions.yml new file mode 100644 index 00000000..e69de29b diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb new file mode 100644 index 00000000..f6d6576f --- /dev/null +++ b/test/integration/recurring_tasks_test.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true +require "test_helper" + +class RecurringTasksTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + @pid = run_supervisor_as_fork(mode: :all) + # 1 supervisor + 2 workers + 1 dispatcher + wait_for_registered_processes(4, timeout: 3.second) + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + + SolidQueue::Process.destroy_all + SolidQueue::Job.destroy_all + JobResult.delete_all + end + + test "enqueue and process periodic tasks" do + wait_for_jobs_to_be_enqueued(2, timeout: 2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + terminate_process(@pid) + + skip_active_record_query_cache do + assert SolidQueue::Job.count >= 2 + SolidQueue::Job.all.each do |job| + assert_equal "periodic_store_result", job.recurring_execution.task_key + assert_equal "StoreResultJob", job.class_name + end + + assert_equal 2, JobResult.count + JobResult.all.each do |result| + assert_equal "custom_status", result.status + assert_equal "42", result.value + end + end + end + + private + def wait_for_jobs_to_be_enqueued(count, timeout: 1.second) + wait_while_with_timeout(timeout) { SolidQueue::Job.count < count } + end +end diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 4c974f6e..85f5f288 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -24,10 +24,15 @@ class ConfigurationTest < ActiveSupport::TestCase test "provide configuration as a hash and fill defaults" do background_worker = { queues: "background", polling_interval: 10 } - config_as_hash = { workers: [ background_worker, background_worker ] } + dispatcher = { batch_size: 100 } + config_as_hash = { workers: [ background_worker, background_worker ], dispatchers: [ dispatcher ] } configuration = SolidQueue::Configuration.new(mode: :all, load_from: config_as_hash) - assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], configuration.dispatchers.first.polling_interval + assert_equal 1, configuration.dispatchers.count + dispatcher = configuration.dispatchers.first + assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], dispatcher.polling_interval + assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:concurrency_maintenance_interval], dispatcher.concurrency_maintenance.interval + assert_equal 2, configuration.workers.count assert_equal [ "background" ], configuration.workers.flat_map(&:queues).uniq assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 52ebfa7a..6f06b7c8 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -12,15 +12,49 @@ class DispatcherTest < ActiveSupport::TestCase teardown do @dispatcher.stop + SolidQueue::Job.delete_all + SolidQueue::Process.delete_all end test "dispatcher is registered as process" do @dispatcher.start wait_for_registered_processes(1, timeout: 1.second) + process = SolidQueue::Process.first + assert_equal "Dispatcher", process.kind + assert_equal({ "polling_interval" => 0.1, "batch_size" => 10, "concurrency_maintenance_interval" => 600 }, process.metadata) + end + + test "concurrency maintenance is optional" do + no_concurrency_maintenance_dispatcher = SolidQueue::Dispatcher.new(polling_interval: 0.1, batch_size: 10, concurrency_maintenance: false) + no_concurrency_maintenance_dispatcher.start + + wait_for_registered_processes(1, timeout: 1.second) + process = SolidQueue::Process.first assert_equal "Dispatcher", process.kind assert_equal({ "polling_interval" => 0.1, "batch_size" => 10 }, process.metadata) + + ensure + no_concurrency_maintenance_dispatcher.stop + end + + test "recurring schedule" do + recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } + with_recurring_schedule = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) + + with_recurring_schedule.start + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Dispatcher", process.kind + + schedule_from_metadata = process.metadata["recurring_schedule"] + assert_equal 1, schedule_from_metadata.size + assert_equal({ "class_name" => "AddToBufferJob", "schedule" => "every hour", "arguments" => [ 42 ] }, schedule_from_metadata["example_task"]) + ensure + with_recurring_schedule.stop end test "polling queries are logged" do @@ -51,7 +85,7 @@ class DispatcherTest < ActiveSupport::TestCase SolidQueue.silence_polling = old_silence_polling end - test "run more than one instance of the dispatcher" do + test "run more than one instance of the dispatcher without recurring tasks" do 15.times do AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled") end @@ -67,6 +101,26 @@ class DispatcherTest < ActiveSupport::TestCase assert_equal 0, SolidQueue::ScheduledExecution.count assert_equal 15, SolidQueue::ReadyExecution.count + ensure another_dispatcher.stop end + + test "run more than one instance of the dispatcher with recurring tasks" do + recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } + dispatchers = 2.times.collect do + SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) + end + + dispatchers.each(&:start) + sleep 2 + dispatchers.each(&:stop) + + assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count + assert SolidQueue::Job.count < 4 + + run_at_times = SolidQueue::RecurringExecution.all.map(&:run_at).sort + 0.upto(run_at_times.length - 2) do |i| + assert_equal 1, run_at_times[i + 1] - run_at_times[i] + end + end end diff --git a/test/unit/recurring_task_test.rb b/test/unit/recurring_task_test.rb new file mode 100644 index 00000000..76b42060 --- /dev/null +++ b/test/unit/recurring_task_test.rb @@ -0,0 +1,96 @@ +require "test_helper" + +class RecurringTaskTest < ActiveSupport::TestCase + class JobWithoutArguments < ApplicationJob + def perform + JobBuffer.add "job_without_arguments" + end + end + + class JobWithRegularHashArguments < ApplicationJob + def perform(value, options) + JobBuffer.add [ value, options[:value] ] + end + end + + class JobWithKeywordArgument < ApplicationJob + def perform(value, value_kwarg:) + JobBuffer.add [ value, value_kwarg ] + end + end + + class JobWithMultipleTypeArguments < ApplicationJob + def perform(value, options = {}, **kwargs) + JobBuffer.add [ value, options[:value], kwargs[:value_kwarg] ] + end + end + + setup do + @worker = SolidQueue::Worker.new(queues: "*") + @worker.mode = :inline + end + + test "job without arguments" do + task = recurring_task_with(class_name: "JobWithoutArguments") + enqueue_and_assert_performed_with_result task, "job_without_arguments" + end + + test "job with regular hash argument" do + task = recurring_task_with(class_name: "JobWithRegularHashArguments", args: [ "regular_hash_argument", { value: 42, not_used: 24 } ]) + + enqueue_and_assert_performed_with_result task, [ "regular_hash_argument", 42 ] + end + + test "job with keyword argument" do + task = recurring_task_with(class_name: "JobWithKeywordArgument", args: [ "keyword_argument", { value_kwarg: [ 42, 24 ] } ]) + enqueue_and_assert_performed_with_result task, [ "keyword_argument", [ 42, 24 ] ] + end + + test "job with arguments of multiple types" do + task = recurring_task_with(class_name: "JobWithMultipleTypeArguments", args: + [ "multiple_types", { value: "regular_hash_value", not_used: 28 }, value_kwarg: 42, not_used: 32 ]) + enqueue_and_assert_performed_with_result task, [ "multiple_types", "regular_hash_value", 42 ] + end + + test "job with arguments of multiple types ignoring optional regular hash" do + task = recurring_task_with(class_name: "JobWithMultipleTypeArguments", args: + [ "multiple_types", value: "regular_hash_value", value_kwarg: 42, not_used: 32 ]) + enqueue_and_assert_performed_with_result task, [ "multiple_types", nil, 42 ] + end + + test "valid and invalid schedules" do + assert_not recurring_task_with(class_name: "JobWithoutArguments", schedule: "once a year").valid? + assert_not recurring_task_with(class_name: "JobWithoutArguments", schedule: "tomorrow").valid? + + task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every Thursday at 1 AM") + assert task.valid? + # At 1 AM on the 4th day of the week + assert task.to_s.ends_with? "[ 0 1 * * 4 ]" + + task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every month") + assert task.valid? + # At 12:00 AM, on day 1 of the month + assert task.to_s.ends_with? "[ 0 0 1 * * ]" + + task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every second") + assert task.valid? + assert task.to_s.ends_with? "[ * * * * * * ]" + end + + private + def enqueue_and_assert_performed_with_result(task, result) + assert_difference [ -> { SolidQueue::Job.count }, -> { SolidQueue::ReadyExecution.count } ], +1 do + task.enqueue(at: Time.now) + end + + assert_difference -> { JobBuffer.size }, +1 do + @worker.start + end + + assert_equal result, JobBuffer.last_value + end + + def recurring_task_with(class_name:, schedule: "every hour", args: nil) + SolidQueue::Dispatcher::RecurringTask.from_configuration("task-id", class: "RecurringTaskTest::#{class_name}", schedule: schedule, args: args) + end +end