diff --git a/History.txt b/History.txt index 738512a..dcf7b85 100644 --- a/History.txt +++ b/History.txt @@ -1,3 +1,8 @@ +Version 2.1.0 +===== + +Add support for Resque Hooks + Version 2.0.0 ===== diff --git a/VERSION b/VERSION index f1547e6..7ec1d6d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.7 +2.1.0 diff --git a/lib/resque/plugins/multi_step_task.rb b/lib/resque/plugins/multi_step_task.rb index b2fc90a..03f310c 100644 --- a/lib/resque/plugins/multi_step_task.rb +++ b/lib/resque/plugins/multi_step_task.rb @@ -87,9 +87,43 @@ def find(task_id) mst = new(task_id) end + + def perform task_id, job_module_name, *args + job_type = constantize job_module_name + before_hooks( job_type ).each do |hook| + job_type.send( hook, *args ) + end + if around_hooks( job_type ).empty? + perform_without_hooks task_id, job_module_name, *args + else + stack = around_hooks( job_type ).reverse.inject( nil ) do | last_hook, hook | + if last_hook + lambda do + job_type.send( hook, *args ) { last_hook.call } + end + else + lambda do + job_type.send( hook, *args ) do + perform_without_hooks task_id, job_module_name, *args + end + end + end + end + stack.call + end + after_hooks( job_type ).each do |hook| + job_type.send( hook, *args ) + end + end + + %W{ before around after }.each do |name| + define_method "#{name}_hooks" do |job_type| + Resque::Plugin.send "#{name}_hooks", job_type + end + end # Handle job invocation - def perform(task_id, job_module_name, *args) + def perform_without_hooks(task_id, job_module_name, *args) task = perform_without_maybe_finalize(task_id, job_module_name, *args) task.maybe_finalize end @@ -215,13 +249,15 @@ def queue_name # # @param [Class,Module] job_type The type of the job to be performed. def add_job(job_type, *args) - logger.info("[Resque Multi-Step-Task] Incrementing normal_job_count: #{job_type} job added to task id #{task_id} at #{Time.now} (args: #{args})") - - increment_normal_job_count - logger.debug("[Resque Multi-Step-Task] Adding #{job_type} job for #{task_id} (args: #{args})") + with_enque_hooks job_type, *args do + logger.info("[Resque Multi-Step-Task] Incrementing normal_job_count: #{job_type} job added to task id #{task_id} at #{Time.now} (args: #{args})") + + increment_normal_job_count + logger.debug("[Resque Multi-Step-Task] Adding #{job_type} job for #{task_id} (args: #{args})") - redis.rpush 'normal_jobs', Yajl::Encoder.encode([job_type.to_s, *args]) - run_job job_type, *args if started + redis.rpush 'normal_jobs', Yajl::Encoder.encode([job_type.to_s, *args]) + run_job job_type, *args if started + end end # Finalization jobs are performed after all the normal jobs @@ -230,11 +266,13 @@ def add_job(job_type, *args) # # @param [Class,Module] job_type The type of job to be performed. def add_finalization_job(job_type, *args) - logger.info("[Resque Multi-Step-Task] Incrementing finalize_job_count: Finalization job #{job_type} for task id #{task_id} at #{Time.now} (args: #{args})") - increment_finalize_job_count - logger.debug("[Resque Multi-Step-Task] Adding #{job_type} finalization job for #{task_id} (args: #{args})") + with_enque_hooks job_type, *args do + logger.info("[Resque Multi-Step-Task] Incrementing finalize_job_count: Finalization job #{job_type} for task id #{task_id} at #{Time.now} (args: #{args})") + increment_finalize_job_count + logger.debug("[Resque Multi-Step-Task] Adding #{job_type} finalization job for #{task_id} (args: #{args})") - redis.rpush 'finalize_jobs', Yajl::Encoder.encode([job_type.to_s, *args]) + redis.rpush 'finalize_jobs', Yajl::Encoder.encode([job_type.to_s, *args]) + end end def start @@ -354,6 +392,22 @@ def unfinalized_because_of_errors? failed_count > 0 && completed_count < (normal_job_count + finalize_job_count) end + private + + def with_enque_hooks job_type, *args + + before_hooks = Resque::Plugin.before_enqueue_hooks( job_type ).collect do |hook| + job_type.send hook, *args + end + return nil if before_hooks.any? { |result| result == false } + + yield + + Resque::Plugin.after_enqueue_hooks( job_type ).each do |hook| + job_type.send hook, *args + end + end + end end end diff --git a/lib/resque/plugins/multi_step_task/finalization_job.rb b/lib/resque/plugins/multi_step_task/finalization_job.rb index 32c8fc7..d00da9c 100644 --- a/lib/resque/plugins/multi_step_task/finalization_job.rb +++ b/lib/resque/plugins/multi_step_task/finalization_job.rb @@ -8,7 +8,36 @@ class FinalizationJob extend Constantization # Handle job invocation - def self.perform(task_id, job_module_name, *args) + def self.perform task_id, job_module_name, *args + job_type = constantize job_module_name + MultiStepTask.before_hooks( job_type ).each do |hook| + job_type.send( hook, *args ) + end + around_hooks = MultiStepTask.around_hooks( job_type ) + if around_hooks.empty? + perform_without_hooks task_id, job_module_name, *args + else + stack = around_hooks.reverse.inject( nil ) do | last_hook, hook | + if last_hook + lambda do + job_type.send( hook, *args ) { last_hook.call } + end + else + lambda do + job_type.send( hook, *args ) do + perform_without_hooks task_id, job_module_name, *args + end + end + end + end + stack.call + end + MultiStepTask.after_hooks( job_type ).each do |hook| + job_type.send( hook, *args ) + end + end + + def self.perform_without_hooks(task_id, job_module_name, *args) task = MultiStepTask.find(task_id) begin diff --git a/spec/resque/plugins/multi_step_task/finalization_job_spec.rb b/spec/resque/plugins/multi_step_task/finalization_job_spec.rb index 44d0b3a..0a307be 100644 --- a/spec/resque/plugins/multi_step_task/finalization_job_spec.rb +++ b/spec/resque/plugins/multi_step_task/finalization_job_spec.rb @@ -24,6 +24,24 @@ class MultiStepTask FinalizationJob.perform(task.task_id, 'TestJob', 0) end end + + + describe FinalizationJob, "performs job calling hooks" do + + let(:task) { MultiStepTask.create("some-task") } + + before do + MultiStepTask.stub!(:find).and_return(task) + TestJobWithHooks::DummyClass.should_receive :do_something + end + + it "enqueues job with right parameters and executes after enqueue method" do + args = {} + FinalizationJob.perform(task.task_id, 'TestJobWithHooks', args) + args[ 'before_perform' ].should eq "do_something" + end + + end end end @@ -32,3 +50,28 @@ def self.perform(*args) # no op end end + +module ::TestJobWithHooks + + class DummyClass;end + + def self.before_enqueue_do_something args + args[ 'before_enqueue' ] = "do_something" + end + + def self.after_enqueue_do_something args + DummyClass.do_something + end + + def self.before_perform_do_something args + args[ 'before_perform' ] = "do_something" + end + + def self.after_perform_do_something args + DummyClass.do_something + end + + def self.perform(*args) + # no op + end +end diff --git a/spec/resque/plugins/multi_step_task_spec.rb b/spec/resque/plugins/multi_step_task_spec.rb index be8a6a1..fbf7c53 100644 --- a/spec/resque/plugins/multi_step_task_spec.rb +++ b/spec/resque/plugins/multi_step_task_spec.rb @@ -247,6 +247,48 @@ module Resque::Plugins task.should_not be_incomplete_because_of_errors end end + + + + describe MultiStepTask, "enqueing job with hooks" do + + let(:task) { MultiStepTask.create("some-task") } + + before do + TestJobWithHooks::DummyClass.should_receive :do_something + end + + it "enqueues job with right parameters and executes after enqueue method" do + Resque::Job.should_receive(:create).with( + task.queue_name, Resque::Plugins::MultiStepTask, task.task_id, 'TestJobWithHooks', { 'before_enqueue' => "do_something"} + ) + args = {} + task.add_job TestJobWithHooks, args + end + + it "enqueues finalization and executes after enqueue method" do + args = {} + task.add_finalization_job TestJobWithHooks, args + end + + end + + describe MultiStepTask, "performs job calling hooks" do + + let(:task) { MultiStepTask.create("some-task") } + + before do + TestJobWithHooks::DummyClass.should_receive :do_something + end + + it "enqueues job with right parameters and executes after enqueue method" do + args = {} + MultiStepTask.perform(task.task_id, 'TestJobWithHooks', args) + args[ 'before_perform' ].should eq "do_something" + end + + end + end module ::TestJob @@ -261,3 +303,28 @@ def self.perform(*args) end end +module ::TestJobWithHooks + + class DummyClass;end + + def self.before_enqueue_do_something args + args[ 'before_enqueue' ] = "do_something" + end + + def self.after_enqueue_do_something args + DummyClass.do_something + end + + def self.before_perform_do_something args + args[ 'before_perform' ] = "do_something" + end + + def self.after_perform_do_something args + DummyClass.do_something + end + + def self.perform(*args) + # no op + end +end +