diff --git a/.gitignore b/.gitignore index 147c05f2..3cbf5eae 100644 --- a/.gitignore +++ b/.gitignore @@ -14,5 +14,8 @@ # Folder for Visual Studio Code /.vscode/ +# Files for RVM holdouts +.ruby-gemset + # misc .DS_Store diff --git a/.rubocop.yml b/.rubocop.yml index 0269c78b..999ad5f5 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -4,7 +4,7 @@ inherit_gem: { rubocop-rails-omakase: rubocop.yml } AllCops: - TargetRubyVersion: 3.0 + TargetRubyVersion: 3.3 Exclude: - "test/dummy/db/schema.rb" - "test/dummy/db/queue_schema.rb" diff --git a/Gemfile.lock b/Gemfile.lock index 8c030422..2df57956 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -54,9 +54,10 @@ GEM concurrent-ruby (1.3.4) connection_pool (2.4.1) crass (1.0.6) - debug (1.7.1) - irb (>= 1.5.0) - reline (>= 0.3.1) + date (3.4.1) + debug (1.9.2) + irb (~> 1.10) + reline (>= 0.3.8) drb (2.2.1) erubi (1.13.0) et-orbi (1.2.11) @@ -74,6 +75,7 @@ GEM reline (>= 0.4.2) json (2.8.2) language_server-protocol (3.17.0.3) + logger (1.6.2) loofah (2.23.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) @@ -95,7 +97,8 @@ GEM ast (~> 2.4.1) racc pg (1.5.4) - psych (5.2.0) + psych (5.2.1) + date stringio puma (6.4.3) nio4r (~> 2.0) @@ -175,16 +178,19 @@ GEM PLATFORMS arm64-darwin-22 arm64-darwin-23 + arm64-darwin-24 x86_64-darwin-21 x86_64-darwin-23 x86_64-linux DEPENDENCIES - debug + debug (~> 1.9) + logger mocha mysql2 pg puma + rdoc rubocop-rails-omakase solid_queue! sqlite3 diff --git a/lib/solid_queue/processes/interruptible.rb b/lib/solid_queue/processes/interruptible.rb index 67173aeb..09c027b6 100644 --- a/lib/solid_queue/processes/interruptible.rb +++ b/lib/solid_queue/processes/interruptible.rb @@ -7,31 +7,23 @@ def wake_up end private - SELF_PIPE_BLOCK_SIZE = 11 def interrupt - self_pipe[:writer].write_nonblock(".") - rescue Errno::EAGAIN, Errno::EINTR - # Ignore writes that would block and retry - # if another signal arrived while writing - retry + queue << true end def interruptible_sleep(time) - if time > 0 && self_pipe[:reader].wait_readable(time) - loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } - end - rescue Errno::EAGAIN, Errno::EINTR + # Invoking from the main thread can result in a 35% slowdown (at least when running the test suite). + # Using some form of Async (Futures) addresses this performance issue. + Concurrent::Promises.future(time) do |timeout| + if timeout > 0 && queue.pop(timeout:) + queue.clear + end + end.value end - # Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html) - def self_pipe - @self_pipe ||= create_self_pipe - end - - def create_self_pipe - reader, writer = IO.pipe - { reader: reader, writer: writer } + def queue + @queue ||= Queue.new end end end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 7faeabfa..5a4b0de4 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -37,11 +37,13 @@ Gem::Specification.new do |spec| spec.add_dependency "fugit", "~> 1.11.0" spec.add_dependency "thor", "~> 1.3.1" - spec.add_development_dependency "debug" + spec.add_development_dependency "debug", "~> 1.9" spec.add_development_dependency "mocha" spec.add_development_dependency "puma" spec.add_development_dependency "mysql2" spec.add_development_dependency "pg" spec.add_development_dependency "sqlite3" spec.add_development_dependency "rubocop-rails-omakase" + spec.add_development_dependency "rdoc" + spec.add_development_dependency "logger" end diff --git a/test/dummy/config/initializers/enable_yjit.rb b/test/dummy/config/initializers/enable_yjit.rb new file mode 100644 index 00000000..5367a5de --- /dev/null +++ b/test/dummy/config/initializers/enable_yjit.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +# Ideally, tests should be configured as close to production settings as +# possible and YJIT is likely to be enabled. While it's highly unlikely +# YJIT would cause issues, enabling it confirms this assertion. +# +# Configured via initializer to align with Rails 7.1 default in gemspec +if defined?(RubyVM::YJIT.enable) + Rails.application.config.after_initialize do + RubyVM::YJIT.enable + end +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index e181e4ca..dbce706d 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -85,7 +85,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "run several jobs over the same record sequentially, with some of them failing" do ("A".."F").each_with_index do |name, i| # A, C, E will fail, for i= 0, 2, 4 - SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?)) + SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?)) end ("G".."K").each do |name| diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index c90d161a..59443ccf 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -162,7 +162,7 @@ class InstrumentationTest < ActiveSupport::TestCase test "errors when deregistering processes are included in deregister_process events" do previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - error = RuntimeError.new("everything is broken") + error = ExpectedTestError.new("everything is broken") SolidQueue::Process.any_instance.expects(:destroy!).raises(error).at_least_once events = subscribed("deregister_process.solid_queue") do @@ -182,7 +182,7 @@ class InstrumentationTest < ActiveSupport::TestCase end test "retrying failed job emits retry event" do - RaisingJob.perform_later(RuntimeError, "A") + RaisingJob.perform_later(ExpectedTestError, "A") job = SolidQueue::Job.last worker = SolidQueue::Worker.new.tap(&:start) @@ -198,7 +198,7 @@ class InstrumentationTest < ActiveSupport::TestCase end test "retrying failed jobs in bulk emits retry_all" do - 3.times { RaisingJob.perform_later(RuntimeError, "A") } + 3.times { RaisingJob.perform_later(ExpectedTestError, "A") } AddToBufferJob.perform_later("A") jobs = SolidQueue::Job.last(4) @@ -392,7 +392,7 @@ class InstrumentationTest < ActiveSupport::TestCase test "thread errors emit thread_error events" do previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - error = RuntimeError.new("everything is broken") + error = ExpectedTestError.new("everything is broken") SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once AddToBufferJob.perform_later "hey!" diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index e1b713ee..1740f760 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -4,11 +4,13 @@ class JobsLifecycleTest < ActiveSupport::TestCase setup do + SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ]) @worker = SolidQueue::Worker.new(queues: "background", threads: 3) @dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2) end teardown do + SolidQueue.on_thread_error = @on_thread_error @worker.stop @dispatcher.stop @@ -29,8 +31,8 @@ class JobsLifecycleTest < ActiveSupport::TestCase end test "enqueue and run jobs that fail without retries" do - RaisingJob.perform_later(RuntimeError, "A") - RaisingJob.perform_later(RuntimeError, "B") + RaisingJob.perform_later(ExpectedTestError, "A") + RaisingJob.perform_later(ExpectedTestError, "B") jobs = SolidQueue::Job.last(2) @dispatcher.start @@ -38,7 +40,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(3.seconds) - message = "raised RuntimeError for the 1st time" + message = "raised ExpectedTestError for the 1st time" assert_equal [ "A: #{message}", "B: #{message}" ], JobBuffer.values.sort assert_empty SolidQueue::Job.finished diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 5d5c2072..b96c452d 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -144,11 +144,11 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase test "process some jobs that raise errors" do 2.times { enqueue_store_result_job("no error", :background) } 2.times { enqueue_store_result_job("no error", :default) } - error1 = enqueue_store_result_job("error", :background, exception: RuntimeError) + error1 = enqueue_store_result_job("error", :background, exception: ExpectedTestError) enqueue_store_result_job("no error", :background, pause: 0.03) - error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05) + error2 = enqueue_store_result_job("error", :background, exception: ExpectedTestError, pause: 0.05) 2.times { enqueue_store_result_job("no error", :default, pause: 0.01) } - error3 = enqueue_store_result_job("error", :default, exception: RuntimeError) + error3 = enqueue_store_result_job("error", :default, exception: ExpectedTestError) wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ]) diff --git a/test/models/solid_queue/failed_execution_test.rb b/test/models/solid_queue/failed_execution_test.rb index 7b142991..c2299b8a 100644 --- a/test/models/solid_queue/failed_execution_test.rb +++ b/test/models/solid_queue/failed_execution_test.rb @@ -7,7 +7,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase end test "run job that fails" do - RaisingJob.perform_later(RuntimeError, "A") + RaisingJob.perform_later(ExpectedTestError, "A") @worker.start assert_equal 1, SolidQueue::FailedExecution.count @@ -15,15 +15,17 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase end test "run job that fails with a SystemStackError (stack level too deep)" do - InfiniteRecursionJob.perform_later - @worker.start + silence_on_thread_error_for(SystemStackError) do + InfiniteRecursionJob.perform_later + @worker.start - assert_equal 1, SolidQueue::FailedExecution.count - assert SolidQueue::Job.last.failed? + assert_equal 1, SolidQueue::FailedExecution.count + assert SolidQueue::Job.last.failed? + end end test "retry failed job" do - RaisingJob.perform_later(RuntimeError, "A") + RaisingJob.perform_later(ExpectedTestError, "A") @worker.start assert_difference -> { SolidQueue::FailedExecution.count }, -1 do @@ -34,7 +36,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase end test "retry failed jobs in bulk" do - 1.upto(5) { |i| RaisingJob.perform_later(RuntimeError, i) } + 1.upto(5) { |i| RaisingJob.perform_later(ExpectedTestError, i) } 1.upto(3) { |i| AddToBufferJob.perform_later(i) } @worker.start diff --git a/test/test_helper.rb b/test/test_helper.rb index 176cb6e1..5f1bebbb 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -24,11 +24,20 @@ def write(...) end Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions) +class ExpectedTestError < RuntimeError; end + class ActiveSupport::TestCase include ProcessesTestHelper, JobsTestHelper + setup do + # Could be cleaner with one several minitest gems, but didn't want to add new dependency + @_on_thread_error = SolidQueue.on_thread_error + SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError) + end + teardown do + SolidQueue.on_thread_error = @_on_thread_error JobBuffer.clear if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile) @@ -69,4 +78,28 @@ def wait_while_with_timeout!(timeout, &block) def skip_active_record_query_cache(&block) SolidQueue::Record.uncached(&block) end + + # Silences specified exceptions during the execution of a block + # + # @param [Exception, Array] expected an Exception or an array of Exceptions to ignore + # @yield Executes the provided block with specified exception(s) silenced + def silence_on_thread_error_for(expected, &block) + SolidQueue.with(on_thread_error: silent_on_thread_error_for(expected)) do + block.call + end + end + + # Does not call on_thread_error for expected exceptions + # @param [Exception, Array] expected an Exception or an array of Exceptions to ignore + def silent_on_thread_error_for(expected) + current_proc = SolidQueue.on_thread_error + + ->(exception) do + expected_exceptions = Array(expected) + + unless expected_exceptions.any? { exception.instance_of?(_1) } + current_proc.call(exception) + end + end + end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 09999808..d511cf74 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -28,14 +28,14 @@ class WorkerTest < ActiveSupport::TestCase original_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { errors << error.message } previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - SolidQueue::ReadyExecution.expects(:claim).raises(RuntimeError.new("everything is broken")).at_least_once + SolidQueue::ReadyExecution.expects(:claim).raises(ExpectedTestError.new("everything is broken")).at_least_once AddToBufferJob.perform_later "hey!" worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2).tap(&:start) sleep(1) - assert_raises RuntimeError do + assert_raises ExpectedTestError do worker.stop end @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase subscriber = ErrorBuffer.new Rails.error.subscribe(subscriber) - SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once + SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once AddToBufferJob.perform_later "hey!" @@ -71,7 +71,7 @@ class WorkerTest < ActiveSupport::TestCase subscriber = ErrorBuffer.new Rails.error.subscribe(subscriber) - RaisingJob.perform_later(RuntimeError, "B") + RaisingJob.perform_later(ExpectedTestError, "B") @worker.start @@ -79,7 +79,7 @@ class WorkerTest < ActiveSupport::TestCase @worker.wake_up assert_equal 1, subscriber.errors.count - assert_equal "This is a RuntimeError exception", subscriber.messages.first + assert_equal "This is a ExpectedTestError exception", subscriber.messages.first ensure Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe) end