Skip to content

Commit bdc1e6c

Browse files
committed
Revert "[Distributed] make finalizer messages threadsafe (JuliaLang#42240)"
This reverts commit eb1d6b3.
1 parent 37b7a33 commit bdc1e6c

File tree

3 files changed

+30
-75
lines changed

3 files changed

+30
-75
lines changed

stdlib/Distributed/src/cluster.jl

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ end
9595
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
9696
mutable struct Worker
9797
id::Int
98-
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
99-
del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
98+
del_msgs::Array{Any,1}
10099
add_msgs::Array{Any,1}
101-
@atomic gcflag::Bool
100+
gcflag::Bool
102101
state::WorkerState
103102
c_state::Condition # wait for state changes
104103
ct_time::Float64 # creation time
@@ -134,7 +133,7 @@ mutable struct Worker
134133
if haskey(map_pid_wrkr, id)
135134
return map_pid_wrkr[id]
136135
end
137-
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func)
136+
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
138137
w.initialized = Event()
139138
register_worker(w)
140139
w
@@ -472,10 +471,6 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
472471
# The `launch` method should add an object of type WorkerConfig for every
473472
# worker launched. It provides information required on how to connect
474473
# to it.
475-
476-
# FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
477-
# but both are part of the public interface. This means we currently can't use
478-
# `Threads.@spawn` in the code below.
479474
launched = WorkerConfig[]
480475
launch_ntfy = Condition()
481476

stdlib/Distributed/src/messages.jl

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -126,30 +126,23 @@ function flush_gc_msgs(w::Worker)
126126
if !isdefined(w, :w_stream)
127127
return
128128
end
129-
add_msgs = nothing
130-
del_msgs = nothing
131-
@lock w.msg_lock begin
132-
if !w.gcflag # No work needed for this worker
133-
return
134-
end
135-
@atomic w.gcflag = false
136-
if !isempty(w.add_msgs)
137-
add_msgs = w.add_msgs
138-
w.add_msgs = Any[]
139-
end
140-
141-
if !isempty(w.del_msgs)
142-
del_msgs = w.del_msgs
143-
w.del_msgs = Any[]
144-
end
145-
end
146-
if add_msgs !== nothing
147-
remote_do(add_clients, w, add_msgs)
129+
w.gcflag = false
130+
new_array = Any[]
131+
msgs = w.add_msgs
132+
w.add_msgs = new_array
133+
if !isempty(msgs)
134+
remote_do(add_clients, w, msgs)
148135
end
149-
if del_msgs !== nothing
150-
remote_do(del_clients, w, del_msgs)
136+
137+
# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
138+
# XXX: threading requires this to be atomic
139+
new_array = Any[]
140+
msgs = w.del_msgs
141+
w.del_msgs = new_array
142+
if !isempty(msgs)
143+
#print("sending delete of $msgs\n")
144+
remote_do(del_clients, w, msgs)
151145
end
152-
return
153146
end
154147

155148
# Boundary inserted between messages on the wire, used for recovering
@@ -194,7 +187,7 @@ end
194187
function flush_gc_msgs()
195188
try
196189
for w in (PGRP::ProcessGroup).workers
197-
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
190+
if isa(w,Worker) && w.gcflag && (w.state == W_CONNECTED)
198191
flush_gc_msgs(w)
199192
end
200193
end

stdlib/Distributed/src/remotecall.jl

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -256,27 +256,14 @@ function del_clients(pairs::Vector)
256256
end
257257
end
258258

259-
# The task below is coalescing the `flush_gc_msgs` call
260-
# across multiple producers, see `send_del_client`,
261-
# and `send_add_client`.
262-
# XXX: Is this worth the additional complexity?
263-
# `flush_gc_msgs` has to iterate over all connected workers.
264-
const any_gc_flag = Threads.Condition()
259+
const any_gc_flag = Condition()
265260
function start_gc_msgs_task()
266-
errormonitor(
267-
Threads.@spawn begin
268-
while true
269-
lock(any_gc_flag) do
270-
# this might miss events
271-
wait(any_gc_flag)
272-
end
273-
flush_gc_msgs() # handles throws internally
274-
end
275-
end
276-
)
261+
errormonitor(@async while true
262+
wait(any_gc_flag)
263+
flush_gc_msgs()
264+
end)
277265
end
278266

279-
# Function can be called within a finalizer
280267
function send_del_client(rr)
281268
if rr.where == myid()
282269
del_client(rr)
@@ -294,27 +281,11 @@ function send_del_client_no_lock(rr)
294281
end
295282
end
296283

297-
function publish_del_msg!(w::Worker, msg)
298-
lock(w.msg_lock) do
299-
push!(w.del_msgs, msg)
300-
@atomic w.gcflag = true
301-
end
302-
lock(any_gc_flag) do
303-
notify(any_gc_flag)
304-
end
305-
end
306-
307284
function process_worker(rr)
308285
w = worker_from_id(rr.where)::Worker
309-
msg = (remoteref_id(rr), myid())
310-
311-
# Needs to aquire a lock on the del_msg queue
312-
T = Threads.@spawn begin
313-
publish_del_msg!($w, $msg)
314-
end
315-
Base.errormonitor(T)
316-
317-
return
286+
push!(w.del_msgs, (remoteref_id(rr), myid()))
287+
w.gcflag = true
288+
notify(any_gc_flag)
318289
end
319290

320291
function add_client(id, client)
@@ -339,13 +310,9 @@ function send_add_client(rr::AbstractRemoteRef, i)
339310
# to the processor that owns the remote ref. it will add_client
340311
# itself inside deserialize().
341312
w = worker_from_id(rr.where)
342-
lock(w.msg_lock) do
343-
push!(w.add_msgs, (remoteref_id(rr), i))
344-
@atomic w.gcflag = true
345-
end
346-
lock(any_gc_flag) do
347-
notify(any_gc_flag)
348-
end
313+
push!(w.add_msgs, (remoteref_id(rr), i))
314+
w.gcflag = true
315+
notify(any_gc_flag)
349316
end
350317
end
351318

0 commit comments

Comments
 (0)