9595@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
9696mutable struct Worker
9797 id:: Int
98- msg_lock:: Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
9998 del_msgs:: Array{Any,1}
10099 add_msgs:: Array{Any,1}
101100 gcflag:: Bool
102101 state:: WorkerState
103- c_state:: Threads. Condition # wait for state changes, lock for state
104- ct_time:: Float64 # creation time
105- conn_func:: Any # used to setup connections lazily
102+ c_state:: Condition # wait for state changes
103+ ct_time:: Float64 # creation time
104+ conn_func:: Any # used to setup connections lazily
106105
107106 r_stream:: IO
108107 w_stream:: IO
@@ -134,7 +133,7 @@ mutable struct Worker
134133 if haskey (map_pid_wrkr, id)
135134 return map_pid_wrkr[id]
136135 end
137- w= new (id, Threads . ReentrantLock (), [], [], false , W_CREATED, Threads . Condition (), time (), conn_func)
136+ w= new (id, [], [], false , W_CREATED, Condition (), time (), conn_func)
138137 w. initialized = Event ()
139138 register_worker (w)
140139 w
@@ -144,16 +143,12 @@ mutable struct Worker
144143end
145144
146145function set_worker_state (w, state)
147- lock (w. c_state) do
148- w. state = state
149- notify (w. c_state; all= true )
150- end
146+ w. state = state
147+ notify (w. c_state; all= true )
151148end
152149
153150function check_worker_state (w:: Worker )
154- lock (w. c_state)
155151 if w. state === W_CREATED
156- unlock (w. c_state)
157152 if ! isclusterlazy ()
158153 if PGRP. topology === :all_to_all
159154 # Since higher pids connect with lower pids, the remote worker
@@ -173,8 +168,6 @@ function check_worker_state(w::Worker)
173168 errormonitor (t)
174169 wait_for_conn (w)
175170 end
176- else
177- unlock (w. c_state)
178171 end
179172end
180173
@@ -193,25 +186,13 @@ function exec_conn_func(w::Worker)
193186end
194187
195188function wait_for_conn (w)
196- lock (w. c_state)
197189 if w. state === W_CREATED
198- unlock (w. c_state)
199190 timeout = worker_timeout () - (time () - w. ct_time)
200191 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
201192
202- T = Threads. @spawn begin
203- sleep ($ timeout)
204- lock (w. c_state) do
205- notify (w. c_state; all= true )
206- end
207- end
208- errormonitor (T)
209- lock (w. c_state) do
210- wait (w. c_state)
211- w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
212- end
213- else
214- unlock (w. c_state)
193+ @async (sleep (timeout); notify (w. c_state; all= true ))
194+ wait (w. c_state)
195+ w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
215196 end
216197 nothing
217198end
@@ -490,10 +471,6 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
490471 # The `launch` method should add an object of type WorkerConfig for every
491472 # worker launched. It provides information required on how to connect
492473 # to it.
493-
494- # FIXME : launched should be a Channel, launch_ntfy should be a Threads.Condition
495- # but both are part of the public interface. This means we currently can't use
496- # `Threads.@spawn` in the code below.
497474 launched = WorkerConfig[]
498475 launch_ntfy = Condition ()
499476
@@ -506,10 +483,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
506483 while true
507484 if isempty (launched)
508485 istaskdone (t_launch) && break
509- @async begin
510- sleep (1 )
511- notify (launch_ntfy)
512- end
486+ @async (sleep (1 ); notify (launch_ntfy))
513487 wait (launch_ntfy)
514488 end
515489
@@ -662,12 +636,7 @@ function create_worker(manager, wconfig)
662636 # require the value of config.connect_at which is set only upon connection completion
663637 for jw in PGRP. workers
664638 if (jw. id != 1 ) && (jw. id < w. id)
665- # wait for wl to join
666- lock (jw. c_state) do
667- if jw. state === W_CREATED
668- wait (jw. c_state)
669- end
670- end
639+ (jw. state === W_CREATED) && wait (jw. c_state)
671640 push! (join_list, jw)
672641 end
673642 end
@@ -690,12 +659,7 @@ function create_worker(manager, wconfig)
690659 end
691660
692661 for wl in wlist
693- lock (wl. c_state) do
694- if wl. state === W_CREATED
695- # wait for wl to join
696- wait (wl. c_state)
697- end
698- end
662+ (wl. state === W_CREATED) && wait (wl. c_state)
699663 push! (join_list, wl)
700664 end
701665 end
@@ -712,11 +676,7 @@ function create_worker(manager, wconfig)
712676 @async manage (w. manager, w. id, w. config, :register )
713677 # wait for rr_ntfy_join with timeout
714678 timedout = false
715- @async begin
716- sleep ($ timeout)
717- timedout = true
718- put! (rr_ntfy_join, 1 )
719- end
679+ @async (sleep ($ timeout); timedout = true ; put! (rr_ntfy_join, 1 ))
720680 wait (rr_ntfy_join)
721681 if timedout
722682 error (" worker did not connect within $timeout seconds" )
0 commit comments