Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions History.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 2.1.0
=====

Add support for Resque Hooks

Version 2.0.0
=====

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.7
2.1.0
76 changes: 65 additions & 11 deletions lib/resque/plugins/multi_step_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,43 @@ def find(task_id)

mst = new(task_id)
end

def perform task_id, job_module_name, *args
job_type = constantize job_module_name
before_hooks( job_type ).each do |hook|
job_type.send( hook, *args )
end
if around_hooks( job_type ).empty?
perform_without_hooks task_id, job_module_name, *args
else
stack = around_hooks( job_type ).reverse.inject( nil ) do | last_hook, hook |
if last_hook
lambda do
job_type.send( hook, *args ) { last_hook.call }
end
else
lambda do
job_type.send( hook, *args ) do
perform_without_hooks task_id, job_module_name, *args
end
end
end
end
stack.call
end
after_hooks( job_type ).each do |hook|
job_type.send( hook, *args )
end
end

%W{ before around after }.each do |name|
define_method "#{name}_hooks" do |job_type|
Resque::Plugin.send "#{name}_hooks", job_type
end
end

# Handle job invocation
def perform(task_id, job_module_name, *args)
def perform_without_hooks(task_id, job_module_name, *args)
task = perform_without_maybe_finalize(task_id, job_module_name, *args)
task.maybe_finalize
end
Expand Down Expand Up @@ -215,13 +249,15 @@ def queue_name
#
# @param [Class,Module] job_type The type of the job to be performed.
def add_job(job_type, *args)
logger.info("[Resque Multi-Step-Task] Incrementing normal_job_count: #{job_type} job added to task id #{task_id} at #{Time.now} (args: #{args})")

increment_normal_job_count
logger.debug("[Resque Multi-Step-Task] Adding #{job_type} job for #{task_id} (args: #{args})")
with_enque_hooks job_type, *args do
logger.info("[Resque Multi-Step-Task] Incrementing normal_job_count: #{job_type} job added to task id #{task_id} at #{Time.now} (args: #{args})")

increment_normal_job_count
logger.debug("[Resque Multi-Step-Task] Adding #{job_type} job for #{task_id} (args: #{args})")

redis.rpush 'normal_jobs', Yajl::Encoder.encode([job_type.to_s, *args])
run_job job_type, *args if started
redis.rpush 'normal_jobs', Yajl::Encoder.encode([job_type.to_s, *args])
run_job job_type, *args if started
end
end

# Finalization jobs are performed after all the normal jobs
Expand All @@ -230,11 +266,13 @@ def add_job(job_type, *args)
#
# @param [Class,Module] job_type The type of job to be performed.
def add_finalization_job(job_type, *args)
logger.info("[Resque Multi-Step-Task] Incrementing finalize_job_count: Finalization job #{job_type} for task id #{task_id} at #{Time.now} (args: #{args})")
increment_finalize_job_count
logger.debug("[Resque Multi-Step-Task] Adding #{job_type} finalization job for #{task_id} (args: #{args})")
with_enque_hooks job_type, *args do
logger.info("[Resque Multi-Step-Task] Incrementing finalize_job_count: Finalization job #{job_type} for task id #{task_id} at #{Time.now} (args: #{args})")
increment_finalize_job_count
logger.debug("[Resque Multi-Step-Task] Adding #{job_type} finalization job for #{task_id} (args: #{args})")

redis.rpush 'finalize_jobs', Yajl::Encoder.encode([job_type.to_s, *args])
redis.rpush 'finalize_jobs', Yajl::Encoder.encode([job_type.to_s, *args])
end
end

def start
Expand Down Expand Up @@ -354,6 +392,22 @@ def unfinalized_because_of_errors?
failed_count > 0 && completed_count < (normal_job_count + finalize_job_count)
end

private

def with_enque_hooks job_type, *args

before_hooks = Resque::Plugin.before_enqueue_hooks( job_type ).collect do |hook|
job_type.send hook, *args
end
return nil if before_hooks.any? { |result| result == false }

yield

Resque::Plugin.after_enqueue_hooks( job_type ).each do |hook|
job_type.send hook, *args
end
end

end
end
end
Expand Down
31 changes: 30 additions & 1 deletion lib/resque/plugins/multi_step_task/finalization_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,36 @@ class FinalizationJob
extend Constantization

# Handle job invocation
def self.perform(task_id, job_module_name, *args)
def self.perform task_id, job_module_name, *args
job_type = constantize job_module_name
MultiStepTask.before_hooks( job_type ).each do |hook|
job_type.send( hook, *args )
end
around_hooks = MultiStepTask.around_hooks( job_type )
if around_hooks.empty?
perform_without_hooks task_id, job_module_name, *args
else
stack = around_hooks.reverse.inject( nil ) do | last_hook, hook |
if last_hook
lambda do
job_type.send( hook, *args ) { last_hook.call }
end
else
lambda do
job_type.send( hook, *args ) do
perform_without_hooks task_id, job_module_name, *args
end
end
end
end
stack.call
end
MultiStepTask.after_hooks( job_type ).each do |hook|
job_type.send( hook, *args )
end
end

def self.perform_without_hooks(task_id, job_module_name, *args)
task = MultiStepTask.find(task_id)

begin
Expand Down
43 changes: 43 additions & 0 deletions spec/resque/plugins/multi_step_task/finalization_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ class MultiStepTask
FinalizationJob.perform(task.task_id, 'TestJob', 0)
end
end


describe FinalizationJob, "performs job calling hooks" do

let(:task) { MultiStepTask.create("some-task") }

before do
MultiStepTask.stub!(:find).and_return(task)
TestJobWithHooks::DummyClass.should_receive :do_something
end

it "enqueues job with right parameters and executes after enqueue method" do
args = {}
FinalizationJob.perform(task.task_id, 'TestJobWithHooks', args)
args[ 'before_perform' ].should eq "do_something"
end

end
end
end

Expand All @@ -32,3 +50,28 @@ def self.perform(*args)
# no op
end
end

module ::TestJobWithHooks

class DummyClass;end

def self.before_enqueue_do_something args
args[ 'before_enqueue' ] = "do_something"
end

def self.after_enqueue_do_something args
DummyClass.do_something
end

def self.before_perform_do_something args
args[ 'before_perform' ] = "do_something"
end

def self.after_perform_do_something args
DummyClass.do_something
end

def self.perform(*args)
# no op
end
end
67 changes: 67 additions & 0 deletions spec/resque/plugins/multi_step_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,48 @@ module Resque::Plugins
task.should_not be_incomplete_because_of_errors
end
end



describe MultiStepTask, "enqueing job with hooks" do

let(:task) { MultiStepTask.create("some-task") }

before do
TestJobWithHooks::DummyClass.should_receive :do_something
end

it "enqueues job with right parameters and executes after enqueue method" do
Resque::Job.should_receive(:create).with(
task.queue_name, Resque::Plugins::MultiStepTask, task.task_id, 'TestJobWithHooks', { 'before_enqueue' => "do_something"}
)
args = {}
task.add_job TestJobWithHooks, args
end

it "enqueues finalization and executes after enqueue method" do
args = {}
task.add_finalization_job TestJobWithHooks, args
end

end

describe MultiStepTask, "performs job calling hooks" do

let(:task) { MultiStepTask.create("some-task") }

before do
TestJobWithHooks::DummyClass.should_receive :do_something
end

it "enqueues job with right parameters and executes after enqueue method" do
args = {}
MultiStepTask.perform(task.task_id, 'TestJobWithHooks', args)
args[ 'before_perform' ].should eq "do_something"
end

end

end

module ::TestJob
Expand All @@ -261,3 +303,28 @@ def self.perform(*args)
end
end

module ::TestJobWithHooks

class DummyClass;end

def self.before_enqueue_do_something args
args[ 'before_enqueue' ] = "do_something"
end

def self.after_enqueue_do_something args
DummyClass.do_something
end

def self.before_perform_do_something args
args[ 'before_perform' ] = "do_something"
end

def self.after_perform_do_something args
DummyClass.do_something
end

def self.perform(*args)
# no op
end
end