@@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
2626 where :: Int
2727 whence:: Int
2828 id:: Int
29- v:: Union{Some{Any}, Nothing}
29+ lock:: ReentrantLock
30+ @atomic v:: Union{Some{Any}, Nothing}
3031
3132 Future (w:: Int , rrid:: RRID , v:: Union{Some, Nothing} = nothing ) =
32- (r = new (w,rrid. whence,rrid. id,v); return test_existing_ref (r))
33+ (r = new (w,rrid. whence,rrid. id,ReentrantLock (), v); return test_existing_ref (r))
3334
34- Future (t:: NTuple{4, Any} ) = new (t[1 ],t[2 ],t[3 ],t[4 ]) # Useful for creating dummy, zeroed-out instances
35+ Future (t:: NTuple{4, Any} ) = new (t[1 ],t[2 ],t[3 ],ReentrantLock (), t[4 ]) # Useful for creating dummy, zeroed-out instances
3536end
3637
3738"""
@@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef)
6970 found = getkey (client_refs, r, nothing )
7071 if found != = nothing
7172 @assert r. where > 0
72- if isa (r, Future) && found. v === nothing && r. v != = nothing
73- # we have recd the value from another source, probably a deserialized ref, send a del_client message
74- send_del_client (r)
75- found. v = r. v
73+ if isa (r, Future)
74+ # this is only for copying the reference from Future to RemoteRef (just created)
75+ fv_cache = @atomic :acquire found. v
76+ rv_cache = @atomic :monotonic r. v
77+ if fv_cache === nothing && rv_cache != = nothing
78+ # we have recd the value from another source, probably a deserialized ref, send a del_client message
79+ send_del_client (r)
80+ @lock found. lock begin
81+ @atomicreplace found. v nothing => rv_cache
82+ end
83+ end
7684 end
7785 return found:: typeof (r)
7886 end
@@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef)
9199 send_del_client_no_lock (r)
92100 else
93101 # send_del_client only if the reference has not been set
94- r. v === nothing && send_del_client_no_lock (r)
95- r. v = nothing
102+ v_cache = @atomic :monotonic r. v
103+ v_cache === nothing && send_del_client_no_lock (r)
104+ @atomic :monotonic r. v = nothing
96105 end
97106 r. where = 0
98107 finally
@@ -201,7 +210,8 @@ isready(f) # will not block
201210```
202211"""
203212function isready (rr:: Future )
204- rr. v === nothing || return true
213+ v_cache = @atomic rr. v
214+ v_cache === nothing || return true
205215
206216 rid = remoteref_id (rr)
207217 return if rr. where == myid ()
@@ -354,26 +364,33 @@ end
354364
355365channel_type (rr:: RemoteChannel{T} ) where {T} = T
356366
357- serialize (s:: ClusterSerializer , f:: Future ) = serialize (s, f, f. v === nothing )
358- serialize (s:: ClusterSerializer , rr:: RemoteChannel ) = serialize (s, rr, true )
359- function serialize (s:: ClusterSerializer , rr:: AbstractRemoteRef , addclient)
360- if addclient
367+ function serialize (s:: ClusterSerializer , f:: Future )
368+ v_cache = @atomic f. v
369+ if v_cache === nothing
361370 p = worker_id_from_socket (s. io)
362- (p != = rr . where) && send_add_client (rr , p)
371+ (p != = f . where) && send_add_client (f , p)
363372 end
373+ fc = Future ((f. where, f. whence, f. id, v_cache)) # copy to be used for serialization (contains a reset lock)
374+ invoke (serialize, Tuple{ClusterSerializer, Any}, s, fc)
375+ end
376+
377+ function serialize (s:: ClusterSerializer , rr:: RemoteChannel )
378+ p = worker_id_from_socket (s. io)
379+ (p != = rr. where) && send_add_client (rr, p)
364380 invoke (serialize, Tuple{ClusterSerializer, Any}, s, rr)
365381end
366382
367383function deserialize (s:: ClusterSerializer , t:: Type{<:Future} )
368- f = invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t)
369- f2 = Future (f . where, RRID (f . whence, f . id), f . v) # ctor adds to client_refs table
384+ fc = invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy
385+ f2 = Future (fc . where, RRID (fc . whence, fc . id), fc . v) # ctor adds to client_refs table
370386
371387 # 1) send_add_client() is not executed when the ref is being serialized
372388 # to where it exists, hence do it here.
373389 # 2) If we have received a 'fetch'ed Future or if the Future ctor found an
374390 # already 'fetch'ed instance in client_refs (Issue #25847), we should not
375391 # track it in the backing RemoteValue store.
376- if f2. where == myid () && f2. v === nothing
392+ f2v_cache = @atomic f2. v
393+ if f2. where == myid () && f2v_cache === nothing
377394 add_client (remoteref_id (f2), myid ())
378395 end
379396 f2
567584
568585Wait for a value to become available for the specified [`Future`](@ref).
569586"""
570- wait (r:: Future ) = (r. v != = nothing && return r; call_on_owner (wait_ref, r, myid ()); r)
587+ wait (r:: Future ) = (v_cache = @atomic r. v; v_cache != = nothing && return r; call_on_owner (wait_ref, r, myid ()); r)
571588
572589"""
573590 wait(r::RemoteChannel, args...)
@@ -584,11 +601,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
584601is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
585602"""
586603function fetch (r:: Future )
587- r. v != = nothing && return something (r. v)
588- v = call_on_owner (fetch_ref, r)
589- r. v = Some (v)
604+ v_cache = @atomic r. v
605+ v_cache != = nothing && return something (v_cache)
606+
607+ if r. where == myid ()
608+ rv, v_cache = @lock r. lock begin
609+ v_cache = @atomic :monotonic r. v
610+ rv = v_cache === nothing ? lookup_ref (remoteref_id (r)) : nothing
611+ rv, v_cache
612+ end
613+
614+ if v_cache != = nothing
615+ return something (v_cache)
616+ else
617+ v_local = fetch (rv. c)
618+ end
619+ else
620+ v_local = call_on_owner (fetch_ref, r)
621+ end
622+
623+ v_cache = @atomic r. v
624+
625+ if v_cache === nothing # call_on_owner case
626+ v_old, status = @lock r. lock begin
627+ @atomicreplace r. v nothing => Some (v_local)
628+ end
629+ # status == true - when value obtained through call_on_owner
630+ # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
631+ # why? local put! performs caching and putting into channel under r.lock
632+
633+ # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
634+ v_cache = status ? v_local : v_old
635+ end
636+
590637 send_del_client (r)
591- v
638+ something (v_cache)
592639end
593640
594641fetch_ref (rid, args... ) = fetch (lookup_ref (rid). c, args... )
@@ -612,12 +659,30 @@ A `put!` on an already set `Future` throws an `Exception`.
612659All asynchronous remote calls return `Future`s and set the
613660value to the return value of the call upon completion.
614661"""
615- function put! (rr:: Future , v)
616- rr. v != = nothing && error (" Future can be set only once" )
617- call_on_owner (put_future, rr, v, myid ())
618- rr. v = Some (v)
619- rr
662+ function put! (r:: Future , v)
663+ if r. where == myid ()
664+ rid = remoteref_id (r)
665+ rv = lookup_ref (rid)
666+ isready (rv) && error (" Future can be set only once" )
667+ @lock r. lock begin
668+ put! (rv, v) # this notifies the tasks waiting on the channel in fetch
669+ set_future_cache (r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
670+ end
671+ del_client (rid, myid ())
672+ else
673+ @lock r. lock begin # same idea as above if there were any local tasks fetching on this Future
674+ call_on_owner (put_future, r, v, myid ())
675+ set_future_cache (r, v)
676+ end
677+ end
678+ r
620679end
680+
681+ function set_future_cache (r:: Future , v)
682+ _, ok = @atomicreplace r. v nothing => Some (v)
683+ ok || error (" internal consistency error detected for Future" )
684+ end
685+
621686function put_future (rid, v, caller)
622687 rv = lookup_ref (rid)
623688 isready (rv) && error (" Future can be set only once" )
0 commit comments