Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ An `AbstractWorkerPool` should implement:
- [`push!`](@ref) - add a new worker to the overall pool (available + busy)
- [`put!`](@ref) - put back a worker to the available pool
- [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution)
- [`wait`](@ref) - block until a worker is available
- [`length`](@ref) - number of workers available in the overall pool
- [`isready`](@ref) - return false if a `take!` on the pool would block, else true

Expand Down Expand Up @@ -120,6 +121,11 @@ function wp_local_take!(pool::AbstractWorkerPool)
return worker
end

function wp_local_wait(pool::AbstractWorkerPool)
wait(pool.channel)
return nothing
end

function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
worker = take!(pool)
try
Expand All @@ -133,7 +139,7 @@ end
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
# it avoids the overhead associated with a local remotecall.

for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int))
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int), (:wait, Nothing))
func_local = Symbol(string("wp_local_", func))
@eval begin
function ($func)(pool::WorkerPool)
Expand Down
21 changes: 21 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,27 @@ wp = WorkerPool(workers())
wp = WorkerPool(2:3)
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]

# wait on worker pool
wp = WorkerPool(2:2)
w = take!(wp)

# local call to _wait
@test !isready(wp)
t = @async wait(wp)
@test !istaskdone(t)
put!(wp, w)
status = timedwait(() -> istaskdone(t), 10)
@test status == :ok

# remote call to _wait
take!(wp)
@test !isready(wp)
f = @spawnat w wait(wp)
@test !isready(f)
put!(wp, w)
status = timedwait(() -> isready(f), 10)
@test status == :ok

# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(x->x, wp, 1:100)
Expand Down