From 94a0d59bfbf644026d54800f4c6556292298e213 Mon Sep 17 00:00:00 2001 From: "Hal M. Spitz" Date: Sun, 17 Nov 2024 15:20:07 -0800 Subject: [PATCH 1/3] Silence test output for expected exceptions SolidQueue has excellent built-in error reporting. While this is fantastic for SQ users, it is less than ideal for testing SolidQueue because any test that deliberately uses or triggers an exception produces voluminous error reporting. This error reporting is hugely valuable when the exception is not expected, but distracting and of limited value for expected use-cases, especially when the test confirms the correct outcomes via assertions. This commit adds: * A generic test-specific Exception class: ExpectedTestError This allows testing for specific exceptions while retaining all error reporting infrastructure for unexpected exceptions. * Two helper methods for silencing on_thread_error output These methods accept an Exception or Array(Exception) and simply does not call the output mechanism if the exception passed to on_thread_error matches. This way, any unexpected error during test still reports in a highly visible manner while the exceptions being tested are validated via assertions. * Replaces the stock on_thread_error with one that ignores ExpectedTextError. Updated several tests from using the ruby stock RuntimeError to ExpectedTestError. * Configures tests to run with YJIT enabled This is to test under likely production deployment configuration, not for performance reasons. Note: With the very recent reporting on M4's crashing on Mac's with YJIT enabled, we might want to either defer this change or add a conditional to opt in until the problem is resolved. --- Gemfile.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gemfile.lock b/Gemfile.lock index 2df57956..79544594 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -130,6 +130,8 @@ GEM rake (13.2.1) rdoc (6.8.1) psych (>= 4.0.0) + rdoc (6.6.3.1) + psych (>= 4.0.0) regexp_parser (2.9.2) reline (0.5.12) io-console (~> 0.5) From 5d4021784ccf703b36e11fa1a79d11c45716af32 Mon Sep 17 00:00:00 2001 From: "Hal M. Spitz" Date: Fri, 22 Nov 2024 17:00:55 -0800 Subject: [PATCH 2/3] Upgrade Worker.pool to Promises.future * Upgrade Worker.pool from Future to Promises.future Promises.future leverages an improved, non-blocking, and lock-free implementation of Concurrent Ruby's Async runtime, enhancing performance and future feature compatibility. Promises.future was moved from edge to main in V1.1 (2018). * Replace ClaimedExecution::Results with Concurrent::Maybe `Results` was underutilized and essentially mirrored `Maybe`'s functionality. This change simplifies code and reduces redundancy by leveraging the `Concurrent::Maybe` class. * Centralize error reporting in AppExecutor.handle_thread_error This change was necessitated by the change to Promises.future. Concurrent Ruby has some very strong ideas about exceptions within a future with a little code rearranging, this change pulls the error / exception reporting responsibilities out of the Future execution code path and pushes it to AppExecutor#handle_thread_error. This change ensures that `Rails.error` is called exactly once per `handle_thread_error` invocation regardless of on_thread_error calling `Rails.error` or not. * Update tests to accommodate these changes --- app/models/solid_queue/claimed_execution.rb | 19 +++------- lib/solid_queue.rb | 1 + lib/solid_queue/app_executor.rb | 35 ++++++++++++++++--- lib/solid_queue/engine.rb | 2 +- lib/solid_queue/log_subscriber.rb | 2 +- lib/solid_queue/pool.rb | 12 +++---- test/integration/instrumentation_test.rb | 5 +-- .../solid_queue/claimed_execution_test.rb | 12 +++---- test/unit/worker_test.rb | 2 +- 9 files changed, 51 insertions(+), 39 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c2b13909..cdffb489 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -5,12 +5,6 @@ class SolidQueue::ClaimedExecution < SolidQueue::Execution scope :orphaned, -> { where.missing(:process) } - class Result < Struct.new(:success, :error) - def success? - success - end - end - class << self def claiming(job_ids, process_id, &block) job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } @@ -60,12 +54,9 @@ def discard_all_from_jobs(*) def perform result = execute - if result.success? - finished - else - failed_with(result.error) - raise result.error - end + result.just? ? finished : failed_with(result.reason) + + result ensure job.unblock_next_blocked_job end @@ -93,9 +84,9 @@ def failed_with(error) private def execute ActiveJob::Base.execute(job.arguments) - Result.new(true, nil) + Concurrent::Maybe.just(true) rescue Exception => e - Result.new(false, e) + Concurrent::Maybe.nothing(e) end def finished diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e7070d26..4715136c 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -40,6 +40,7 @@ module SolidQueue mattr_accessor :preserve_finished_jobs, default: true mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes + mattr_accessor :reporting_label, default: "SolidQueue-#{SolidQueue::VERSION}" delegate :on_start, :on_stop, to: Supervisor diff --git a/lib/solid_queue/app_executor.rb b/lib/solid_queue/app_executor.rb index da0976fe..3873ac03 100644 --- a/lib/solid_queue/app_executor.rb +++ b/lib/solid_queue/app_executor.rb @@ -11,11 +11,38 @@ def wrap_in_app_executor(&block) end def handle_thread_error(error) - SolidQueue.instrument(:thread_error, error: error) + CallErrorReporters.new(error).call + end + + private + + # Handles error reporting and guarantees that Rails.error will be called if configured. + # + # This method performs the following actions: + # 1. Invokes `SolidQueue.instrument` for `:thread_error`. + # 2. Invokes `SolidQueue.on_thread_error` if it is configured. + # 3. Invokes `Rails.error.report` if it wasn't invoked by one of the above calls. + class CallErrorReporters + # @param [Exception] error The error to be reported. + def initialize(error) + @error = error + @reported = false + end + + def call + SolidQueue.instrument(:thread_error, error: @error) + Rails.error.subscribe(self) if Rails.error&.respond_to?(:subscribe) - if SolidQueue.on_thread_error - SolidQueue.on_thread_error.call(error) + SolidQueue.on_thread_error&.call(@error) + + Rails.error.report(@error, handled: false, source: SolidQueue.reporting_label) unless @reported + ensure + Rails.error.unsubscribe(self) if Rails.error&.respond_to?(:unsubscribe) + end + + def report(*, **) + @reported = true + end end - end end end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..7d478646 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -18,7 +18,7 @@ class Engine < ::Rails::Engine initializer "solid_queue.app_executor", before: :run_prepare_callbacks do |app| config.solid_queue.app_executor ||= app.executor - config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false) } + config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false, source: SolidQueue.reporting_label) } SolidQueue.app_executor = config.solid_queue.app_executor SolidQueue.on_thread_error = config.solid_queue.on_thread_error diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 3d2ec02c..f17cf9cb 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -162,7 +162,7 @@ def replace_fork(event) private def formatted_event(event, action:, **attributes) - "SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}" + "#{SolidQueue.reporting_label} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}" end def formatted_attributes(**attributes) diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index c1bcf195..a703c3d7 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -18,20 +18,16 @@ def initialize(size, on_idle: nil) def post(execution) available_threads.decrement - future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution| + Concurrent::Promises.future_on(executor, execution) do |thread_execution| wrap_in_app_executor do - thread_execution.perform + result = thread_execution.perform + + handle_thread_error(result.reason) if result.rejected? ensure available_threads.increment mutex.synchronize { on_idle.try(:call) if idle? } end end - - future.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - future.execute end def idle_threads diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 59443ccf..ec8913d1 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -391,9 +391,10 @@ class InstrumentationTest < ActiveSupport::TestCase test "thread errors emit thread_error events" do previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - error = ExpectedTestError.new("everything is broken") - SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once + + # Allows the job to process normally, but trigger the error path in ClaimedExecution.execute + Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(error)) AddToBufferJob.perform_later "hey!" diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 4e99fd04..226dad77 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -22,9 +22,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - assert_raises RuntimeError do - claimed_execution.perform - end + claimed_execution.perform end assert_not job.reload.finished? @@ -39,12 +37,10 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase test "job failures are reported via Rails error subscriber" do subscriber = ErrorBuffer.new - assert_raises RuntimeError do - with_error_subscriber(subscriber) do - claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") + with_error_subscriber(subscriber) do + claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") - claimed_execution.perform - end + claimed_execution.perform end assert_equal 1, subscriber.errors.count diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index d511cf74..f0b8f5fc 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase subscriber = ErrorBuffer.new Rails.error.subscribe(subscriber) - SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once + Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(ExpectedTestError.new("everything is broken"))) AddToBufferJob.perform_later "hey!" From 43f4de8d63cc889407f4bc10d5be7d8e996bae87 Mon Sep 17 00:00:00 2001 From: "Hal M. Spitz" Date: Sun, 24 Nov 2024 14:35:10 -0800 Subject: [PATCH 3/3] Use Concurrent::Promises based TimerTask Promises are the recommended infrastructure, replacing several OG APIs, including TimerTasks. SQ only uses TimerTasks in 3 places (currently) and a very small subset of their overall functionality. SolidQueue::TimerTask is a drop-in replacement. This PR uses AtomicBoolean instead of the recommended Concurrent::Cancellation to avoid a dependency on, and the potential API stability issues of, edge features. This completes the move from the old APIs to Promises and makes all of the new concurrency features (Actors, Channels, etc.) available for future SQ features and enahancements. --- Gemfile.lock | 2 - .../dispatcher/concurrency_maintenance.rb | 8 +- lib/solid_queue/processes/registrable.rb | 8 +- lib/solid_queue/supervisor/maintenance.rb | 9 +-- lib/solid_queue/timer_task.rb | 42 ++++++++++ test/unit/timer_task_test.rb | 79 +++++++++++++++++++ 6 files changed, 125 insertions(+), 23 deletions(-) create mode 100644 lib/solid_queue/timer_task.rb create mode 100644 test/unit/timer_task_test.rb diff --git a/Gemfile.lock b/Gemfile.lock index 79544594..2df57956 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -130,8 +130,6 @@ GEM rake (13.2.1) rdoc (6.8.1) psych (>= 4.0.0) - rdoc (6.6.3.1) - psych (>= 4.0.0) regexp_parser (2.9.2) reline (0.5.12) io-console (~> 0.5) diff --git a/lib/solid_queue/dispatcher/concurrency_maintenance.rb b/lib/solid_queue/dispatcher/concurrency_maintenance.rb index 81cf770c..fd0073d4 100644 --- a/lib/solid_queue/dispatcher/concurrency_maintenance.rb +++ b/lib/solid_queue/dispatcher/concurrency_maintenance.rb @@ -12,16 +12,10 @@ def initialize(interval, batch_size) end def start - @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do + @concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do expire_semaphores unblock_blocked_executions end - - @concurrency_maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @concurrency_maintenance_task.execute end def stop diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 58cabfa8..9c8e33cf 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -37,15 +37,9 @@ def registered? end def launch_heartbeat - @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do + @heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do wrap_in_app_executor { heartbeat } end - - @heartbeat_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @heartbeat_task.execute end def stop_heartbeat diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..48808d7c 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -7,16 +7,11 @@ module Supervisor::Maintenance end private + def launch_maintenance_task - @maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do + @maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do prune_dead_processes end - - @maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @maintenance_task.execute end def stop_maintenance_task diff --git a/lib/solid_queue/timer_task.rb b/lib/solid_queue/timer_task.rb new file mode 100644 index 00000000..9a3fcec4 --- /dev/null +++ b/lib/solid_queue/timer_task.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module SolidQueue + class TimerTask + include AppExecutor + + def initialize(execution_interval:, run_now: false, &block) + raise ArgumentError, "A block is required" unless block_given? + @shutdown = Concurrent::AtomicBoolean.new + + run(run_now, execution_interval, &block) + end + + def shutdown + @shutdown.make_true + end + + private + + def run(run_now, execution_interval, &block) + execute_task(&block) if run_now + + Concurrent::Promises.future(execution_interval) do |interval| + repeating_task(interval, &block) + end.run + end + + def execute_task(&block) + block.call unless @shutdown.true? + rescue Exception => e + handle_thread_error(e) + end + + def repeating_task(interval, &block) + Concurrent::Promises.schedule(interval) do + execute_task(&block) + end.then do + repeating_task(interval, &block) unless @shutdown.true? + end + end + end +end diff --git a/test/unit/timer_task_test.rb b/test/unit/timer_task_test.rb new file mode 100644 index 00000000..b3a9b11b --- /dev/null +++ b/test/unit/timer_task_test.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require "test_helper" +require "mocha/minitest" + +class TimerTaskTest < ActiveSupport::TestCase + test "initialization requires a block" do + assert_raises(ArgumentError) do + SolidQueue::TimerTask.new(execution_interval: 1) + end + end + + test "task runs immediate when run now true" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert executed, "Task should have executed immediately" + task.shutdown + end + + test "task does not run immediately when run with run_now false" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert_not executed, "Task should have executed immediately" + task.shutdown + end + + test "task repeats" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do + executions += 1 + end + + sleep(0.5) # Wait to accumulate some executions + + assert executions > 3, "The block should be executed repeatedly" + + task.shutdown + end + + test "task stops on shutdown" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 } + + sleep(0.3) # Let the task run a few times + + task.shutdown + + current_executions = executions + + sleep(0.5) # Ensure no more executions after shutdown + + assert_equal current_executions, executions, "The task should stop executing after shutdown" + end + + test "calls handle_thread_error if task raises" do + task = SolidQueue::TimerTask.new(execution_interval: 0.1) do + raise ExpectedTestError.new + end + task.expects(:handle_thread_error).with(instance_of(ExpectedTestError)) + + sleep(0.2) # Give some time for the task to run and handle the error + + task.shutdown + end +end