@@ -148,7 +148,7 @@ function set_worker_state(w, state)
148148end
149149
150150function check_worker_state (w:: Worker )
151- if w. state == W_CREATED
151+ if w. state === W_CREATED
152152 if ! isclusterlazy ()
153153 if PGRP. topology === :all_to_all
154154 # Since higher pids connect with lower pids, the remote worker
@@ -185,13 +185,13 @@ function exec_conn_func(w::Worker)
185185end
186186
187187function wait_for_conn (w)
188- if w. state == W_CREATED
188+ if w. state === W_CREATED
189189 timeout = worker_timeout () - (time () - w. ct_time)
190190 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
191191
192192 @async (sleep (timeout); notify (w. c_state; all= true ))
193193 wait (w. c_state)
194- w. state == W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
194+ w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
195195 end
196196 nothing
197197end
@@ -626,7 +626,7 @@ function create_worker(manager, wconfig)
626626 # require the value of config.connect_at which is set only upon connection completion
627627 for jw in PGRP. workers
628628 if (jw. id != 1 ) && (jw. id < w. id)
629- (jw. state == W_CREATED) && wait (jw. c_state)
629+ (jw. state === W_CREATED) && wait (jw. c_state)
630630 push! (join_list, jw)
631631 end
632632 end
@@ -649,7 +649,7 @@ function create_worker(manager, wconfig)
649649 end
650650
651651 for wl in wlist
652- (wl. state == W_CREATED) && wait (wl. c_state)
652+ (wl. state === W_CREATED) && wait (wl. c_state)
653653 push! (join_list, wl)
654654 end
655655 end
767767mutable struct ProcessGroup
768768 name:: AbstractString
769769 workers:: Array{Any,1}
770- refs:: Dict # global references
770+ refs:: Dict{RRID,Any} # global references
771771 topology:: Symbol
772772 lazy:: Union{Bool, Nothing}
773773
@@ -851,7 +851,7 @@ function nprocs()
851851 n = length (PGRP. workers)
852852 # filter out workers in the process of being setup/shutdown.
853853 for jw in PGRP. workers
854- if ! isa (jw, LocalProcess) && (jw. state != W_CONNECTED)
854+ if ! isa (jw, LocalProcess) && (jw. state != = W_CONNECTED)
855855 n = n - 1
856856 end
857857 end
@@ -902,7 +902,7 @@ julia> procs()
902902function procs ()
903903 if myid () == 1 || (PGRP. topology === :all_to_all && ! isclusterlazy ())
904904 # filter out workers in the process of being setup/shutdown.
905- return Int[x. id for x in PGRP. workers if isa (x, LocalProcess) || (x. state == W_CONNECTED)]
905+ return Int[x. id for x in PGRP. workers if isa (x, LocalProcess) || (x. state === W_CONNECTED)]
906906 else
907907 return Int[x. id for x in PGRP. workers]
908908 end
911911function id_in_procs (id) # faster version of `id in procs()`
912912 if myid () == 1 || (PGRP. topology === :all_to_all && ! isclusterlazy ())
913913 for x in PGRP. workers
914- if (x. id:: Int ) == id && (isa (x, LocalProcess) || (x:: Worker ). state == W_CONNECTED)
914+ if (x. id:: Int ) == id && (isa (x, LocalProcess) || (x:: Worker ). state === W_CONNECTED)
915915 return true
916916 end
917917 end
@@ -933,7 +933,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
933933"""
934934function procs (pid:: Integer )
935935 if myid () == 1
936- all_workers = [x for x in PGRP. workers if isa (x, LocalProcess) || (x. state == W_CONNECTED)]
936+ all_workers = [x for x in PGRP. workers if isa (x, LocalProcess) || (x. state === W_CONNECTED)]
937937 if (pid == 1 ) || (isa (map_pid_wrkr[pid]. manager, LocalManager))
938938 Int[x. id for x in filter (w -> (w. id== 1 ) || (isa (w. manager, LocalManager)), all_workers)]
939939 else
@@ -1040,11 +1040,11 @@ function _rmprocs(pids, waitfor)
10401040
10411041 start = time_ns ()
10421042 while (time_ns () - start) < waitfor* 1e9
1043- all (w -> w. state == W_TERMINATED, rmprocset) && break
1043+ all (w -> w. state === W_TERMINATED, rmprocset) && break
10441044 sleep (min (0.1 , waitfor - (time_ns () - start)/ 1e9 ))
10451045 end
10461046
1047- unremoved = [wrkr. id for wrkr in filter (w -> w. state != W_TERMINATED, rmprocset)]
1047+ unremoved = [wrkr. id for wrkr in filter (w -> w. state != = W_TERMINATED, rmprocset)]
10481048 if length (unremoved) > 0
10491049 estr = string (" rmprocs: pids " , unremoved, " not terminated after " , waitfor, " seconds." )
10501050 throw (ErrorException (estr))
0 commit comments