Skip to content

Commit 1b60be5

Browse files
exaexavchuravy
authored andcommitted
avoid using @sync_add on remotecalls (#44671)
* avoid using `@sync_add` on remotecalls It seems like @sync_add adds the Futures to a queue (Channel) for @sync, which in turn calls wait() for all the futures synchronously. Not only that is slightly detrimental for network operations (latencies add up), but in case of Distributed the call to wait() may actually cause some compilation on remote processes, which is also wait()ed for. In result, some operations took a great amount of "serial" processing time if executed on many workers at once. For me, this closes #44645. The major change can be illustrated as follows: First add some workers: ``` using Distributed addprocs(10) ``` and then trigger something that, for example, causes package imports on the workers: ``` using SomeTinyPackage ``` In my case (importing UnicodePlots on 10 workers), this improves the loading time over 10 workers from ~11s to ~5.5s. This is a far bigger issue when worker count gets high. The time of the processing on each worker is usually around 0.3s, so triggering this problem even on a relatively small cluster (64 workers) causes a really annoying delay, and running `@everywhere` for the first time on reasonable clusters (I tested with 1024 workers, see #44645) usually takes more than 5 minutes. Which sucks. Anyway, on 64 workers this reduces the "first import" time from ~30s to ~6s, and on 1024 workers this seems to reduce the time from over 5 minutes (I didn't bother to measure that precisely now, sorry) to ~11s. Related issues: - Probably fixes #39291. - #42156 is a kinda complementary -- it removes the most painful source of slowness (the 0.3s precompilation on the workers), but the fact that the wait()ing is serial remains a problem if the network latencies are high. May help with #38931 Co-authored-by: Valentin Churavy <[email protected]> (cherry picked from commit 62e0729)
1 parent 5ec2d68 commit 1b60be5

File tree

4 files changed

+41
-6
lines changed

4 files changed

+41
-6
lines changed

base/task.jl

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,12 @@ isolating the asynchronous code from changes to the variable's value in the curr
406406
Interpolating values via `\$` is available as of Julia 1.4.
407407
"""
408408
macro async(expr)
409+
do_async_macro(expr)
410+
end
411+
412+
# generate the code for @async, possibly wrapping the task in something before
413+
# pushing it to the wait queue.
414+
function do_async_macro(expr; wrap=identity)
409415
letargs = Base._lift_one_interp!(expr)
410416

411417
thunk = esc(:(()->($expr)))
@@ -414,14 +420,43 @@ macro async(expr)
414420
let $(letargs...)
415421
local task = Task($thunk)
416422
if $(Expr(:islocal, var))
417-
put!($var, task)
423+
put!($var, $(wrap(:task)))
418424
end
419425
schedule(task)
420426
task
421427
end
422428
end
423429
end
424430

431+
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
432+
struct UnwrapTaskFailedException
433+
task::Task
434+
end
435+
436+
# common code for wait&fetch for UnwrapTaskFailedException
437+
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
438+
try
439+
f(t.task)
440+
catch ex
441+
if ex isa TaskFailedException
442+
throw(ex.task.exception)
443+
else
444+
rethrow()
445+
end
446+
end
447+
end
448+
449+
# the unwrapping for above task wrapper (gets triggered in sync_end())
450+
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
451+
452+
# same for fetching the tasks, for convenience
453+
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
454+
455+
# macro for running async code that doesn't throw wrapped exceptions
456+
macro async_unwrap(expr)
457+
do_async_macro(expr, wrap=task->:(Base.UnwrapTaskFailedException($task)))
458+
end
459+
425460
# Capture interpolated variables in $() and move them to let-block
426461
function _lift_one_interp!(e)
427462
letargs = Any[] # store the new gensymed arguments

stdlib/Distributed/src/Distributed.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
1010
hash, ==, kill, close, isopen, showerror
1111

1212
# imports for use
13-
using Base: Process, Semaphore, JLOptions, buffer_writes, @sync_add,
13+
using Base: Process, Semaphore, JLOptions, buffer_writes, @async_unwrap,
1414
VERSION_STRING, binding_module, atexit, julia_exename,
1515
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
1616
shell_escape_posixly, shell_escape_wincmd, escape_microsoft_c_args,
@@ -75,7 +75,7 @@ function _require_callback(mod::Base.PkgId)
7575
# broadcast top-level (e.g. from Main) import/using from node 1 (only)
7676
@sync for p in procs()
7777
p == 1 && continue
78-
@sync_add remotecall(p) do
78+
@async_unwrap remotecall_wait(p) do
7979
Base.require(mod)
8080
nothing
8181
end

stdlib/Distributed/src/clusterserialize.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared.
243243
"""
244244
function clear!(syms, pids=workers(); mod=Main)
245245
@sync for p in pids
246-
@sync_add remotecall(clear_impl!, p, syms, mod)
246+
@async_unwrap remotecall_wait(clear_impl!, p, syms, mod)
247247
end
248248
end
249249
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)

stdlib/Distributed/src/macros.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,10 @@ function remotecall_eval(m::Module, procs, ex)
226226
if pid == myid()
227227
run_locally += 1
228228
else
229-
@sync_add remotecall(Core.eval, pid, m, ex)
229+
@async_unwrap remotecall_wait(Core.eval, pid, m, ex)
230230
end
231231
end
232-
yield() # ensure that the remotecall_fetch have had a chance to start
232+
yield() # ensure that the remotecalls have had a chance to start
233233

234234
# execute locally last as we do not want local execution to block serialization
235235
# of the request to remote nodes.

0 commit comments

Comments
 (0)