@@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
2626    where :: Int 
2727    whence:: Int 
2828    id:: Int 
29-     v:: Union{Some{Any}, Nothing} 
29+     lock:: ReentrantLock 
30+     @atomic  v:: Union{Some{Any}, Nothing} 
3031
3132    Future (w:: Int , rrid:: RRID , v:: Union{Some, Nothing} = nothing ) = 
32-         (r =  new (w,rrid. whence,rrid. id,v); return  test_existing_ref (r))
33+         (r =  new (w,rrid. whence,rrid. id,ReentrantLock (), v); return  test_existing_ref (r))
3334
34-     Future (t:: NTuple{4, Any} ) =  new (t[1 ],t[2 ],t[3 ],t[4 ])  #  Useful for creating dummy, zeroed-out instances
35+     Future (t:: NTuple{4, Any} ) =  new (t[1 ],t[2 ],t[3 ],ReentrantLock (), t[4 ])  #  Useful for creating dummy, zeroed-out instances
3536end 
3637
3738""" 
@@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef)
6970    found =  getkey (client_refs, r, nothing )
7071    if  found != =  nothing 
7172        @assert  r. where >  0 
72-         if  isa (r, Future) &&  found. v ===  nothing  &&  r. v != =  nothing 
73-             #  we have recd the value from another source, probably a deserialized ref, send a del_client message
74-             send_del_client (r)
75-             found. v =  r. v
73+         if  isa (r, Future)
74+             #  this is only for copying the reference from Future to RemoteRef (just created)
75+             fv_cache =  @atomic  :acquire  found. v
76+             rv_cache =  @atomic  :monotonic  r. v
77+             if  fv_cache ===  nothing  &&  rv_cache != =  nothing 
78+                 #  we have recd the value from another source, probably a deserialized ref, send a del_client message
79+                 send_del_client (r)
80+                 @lock  found. lock begin 
81+                     @atomicreplace  found. v nothing  =>  rv_cache
82+                 end 
83+             end 
7684        end 
7785        return  found:: typeof (r)
7886    end 
@@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef)
9199                    send_del_client_no_lock (r)
92100                else 
93101                    #  send_del_client only if the reference has not been set
94-                     r. v ===  nothing  &&  send_del_client_no_lock (r)
95-                     r. v =  nothing 
102+                     v_cache =  @atomic  :monotonic  r. v
103+                     v_cache ===  nothing  &&  send_del_client_no_lock (r)
104+                     @atomic  :monotonic  r. v =  nothing 
96105                end 
97106                r. where =  0 
98107            finally 
@@ -201,7 +210,8 @@ isready(f)  # will not block
201210``` 
202211""" 
203212function  isready (rr:: Future )
204-     rr. v ===  nothing  ||  return  true 
213+     v_cache =  @atomic  rr. v
214+     v_cache ===  nothing  ||  return  true 
205215
206216    rid =  remoteref_id (rr)
207217    return  if  rr. where ==  myid ()
@@ -354,26 +364,33 @@ end
354364
355365channel_type (rr:: RemoteChannel{T} ) where  {T} =  T
356366
357- serialize (s:: ClusterSerializer , f:: Future ) =  serialize (s, f, f. v ===  nothing )
358- serialize (s:: ClusterSerializer , rr:: RemoteChannel ) =  serialize (s, rr, true )
359- function  serialize (s:: ClusterSerializer , rr:: AbstractRemoteRef , addclient)
360-     if  addclient
367+ function  serialize (s:: ClusterSerializer , f:: Future )
368+     v_cache =  @atomic  f. v
369+     if  v_cache ===  nothing 
361370        p =  worker_id_from_socket (s. io)
362-         (p != =  rr . where) &&  send_add_client (rr , p)
371+         (p != =  f . where) &&  send_add_client (f , p)
363372    end 
373+     fc =  Future ((f. where, f. whence, f. id, v_cache)) #  copy to be used for serialization (contains a reset lock)
374+     invoke (serialize, Tuple{ClusterSerializer, Any}, s, fc)
375+ end 
376+ 
377+ function  serialize (s:: ClusterSerializer , rr:: RemoteChannel )
378+     p =  worker_id_from_socket (s. io)
379+     (p != =  rr. where) &&  send_add_client (rr, p)
364380    invoke (serialize, Tuple{ClusterSerializer, Any}, s, rr)
365381end 
366382
367383function  deserialize (s:: ClusterSerializer , t:: Type{<:Future} )
368-     f  =  invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t)
369-     f2 =  Future (f . where, RRID (f . whence, f . id), f . v) #  ctor adds to client_refs table
384+     fc  =  invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t)  #  deserialized copy 
385+     f2 =  Future (fc . where, RRID (fc . whence, fc . id), fc . v) #  ctor adds to client_refs table
370386
371387    #  1) send_add_client() is not executed when the ref is being serialized
372388    #     to where it exists, hence do it here.
373389    #  2) If we have received a 'fetch'ed Future or if the Future ctor found an
374390    #     already 'fetch'ed instance in client_refs (Issue #25847), we should not
375391    #     track it in the backing RemoteValue store.
376-     if  f2. where ==  myid () &&  f2. v ===  nothing 
392+     f2v_cache =  @atomic  f2. v
393+     if  f2. where ==  myid () &&  f2v_cache ===  nothing 
377394        add_client (remoteref_id (f2), myid ())
378395    end 
379396    f2
567584
568585Wait for a value to become available for the specified [`Future`](@ref). 
569586""" 
570- wait (r:: Future ) =  (r. v != =  nothing  &&  return  r; call_on_owner (wait_ref, r, myid ()); r)
587+ wait (r:: Future ) =  (v_cache  =   @atomic   r. v; v_cache  != =  nothing  &&  return  r; call_on_owner (wait_ref, r, myid ()); r)
571588
572589""" 
573590    wait(r::RemoteChannel, args...) 
@@ -584,11 +601,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
584601is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. 
585602""" 
586603function  fetch (r:: Future )
587-     r. v != =  nothing  &&  return  something (r. v)
588-     v =  call_on_owner (fetch_ref, r)
589-     r. v =  Some (v)
604+     v_cache =  @atomic  r. v
605+     v_cache != =  nothing  &&  return  something (v_cache)
606+ 
607+     if  r. where ==  myid ()
608+         rv, v_cache =  @lock  r. lock begin 
609+             v_cache =  @atomic  :monotonic  r. v
610+             rv =  v_cache ===  nothing  ?  lookup_ref (remoteref_id (r)) :  nothing 
611+             rv, v_cache
612+         end 
613+ 
614+         if  v_cache != =  nothing 
615+             return  something (v_cache)
616+         else 
617+             v_local =  fetch (rv. c)
618+         end 
619+     else 
620+         v_local =  call_on_owner (fetch_ref, r)
621+     end 
622+ 
623+     v_cache =  @atomic  r. v
624+ 
625+     if  v_cache ===  nothing  #  call_on_owner case
626+         v_old, status =  @lock  r. lock begin 
627+             @atomicreplace  r. v nothing  =>  Some (v_local)
628+         end 
629+         #  status == true - when value obtained through call_on_owner
630+         #  status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
631+         #  why? local put! performs caching and putting into channel under r.lock
632+ 
633+         #  for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
634+         v_cache =  status ?  v_local :  v_old
635+     end 
636+ 
590637    send_del_client (r)
591-     v 
638+     something (v_cache) 
592639end 
593640
594641fetch_ref (rid, args... ) =  fetch (lookup_ref (rid). c, args... )
@@ -612,12 +659,30 @@ A `put!` on an already set `Future` throws an `Exception`.
612659All asynchronous remote calls return `Future`s and set the 
613660value to the return value of the call upon completion. 
614661""" 
615- function  put! (rr:: Future , v)
616-     rr. v != =  nothing  &&  error (" Future can be set only once" 
617-     call_on_owner (put_future, rr, v, myid ())
618-     rr. v =  Some (v)
619-     rr
662+ function  put! (r:: Future , v)
663+     if  r. where ==  myid ()
664+         rid =  remoteref_id (r)
665+         rv =  lookup_ref (rid)
666+         isready (rv) &&  error (" Future can be set only once" 
667+         @lock  r. lock begin 
668+             put! (rv, v) #  this notifies the tasks waiting on the channel in fetch
669+             set_future_cache (r, v) #  set the cache before leaving the lock, so that the notified tasks already see it cached
670+         end 
671+         del_client (rid, myid ())
672+     else 
673+         @lock  r. lock begin  #  same idea as above if there were any local tasks fetching on this Future
674+             call_on_owner (put_future, r, v, myid ())
675+             set_future_cache (r, v)
676+         end 
677+     end 
678+     r
620679end 
680+ 
681+ function  set_future_cache (r:: Future , v)
682+     _, ok =  @atomicreplace  r. v nothing  =>  Some (v)
683+     ok ||  error (" internal consistency error detected for Future" 
684+ end 
685+ 
621686function  put_future (rid, v, caller)
622687    rv =  lookup_ref (rid)
623688    isready (rv) &&  error (" Future can be set only once" 
0 commit comments