Skip to content

Commit 29f61cd

Browse files
vtjnashJeffBezanson
authored andcommitted
channels: remove WeakRef from Condition (#31673)
Using a WeakRef meant we might not actually `bind` the result. If nobody was still holding a reference to put contents into the Condition, we would simply garbage collect it, and then never need to close it. Since that does not seem to be the intent, instead move to just keeping a strong reference (alternatively, we would have to switch to using `stream_wait` with ref-counting, but that seems suboptimal for several reasons.). fix #31507
1 parent b4e95cb commit 29f61cd

File tree

2 files changed

+31
-31
lines changed

2 files changed

+31
-31
lines changed

base/channels.jl

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,10 @@ Stacktrace:
192192
```
193193
"""
194194
function bind(c::Channel, task::Task)
195-
ref = WeakRef(c)
196-
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
195+
# TODO: implement "schedulewait" and deprecate taskdone_hook
196+
#T = Task(() -> close_chnl_on_taskdone(task, c))
197+
#schedulewait(task, T)
198+
register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c))
197199
return c
198200
end
199201

@@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
223225
return (chnls, tasks)
224226
end
225227

226-
function close_chnl_on_taskdone(t::Task, ref::WeakRef)
227-
c = ref.value
228-
if c isa Channel
229-
isopen(c) || return
230-
cleanup = () -> try
231-
isopen(c) || return
232-
if istaskfailed(t)
233-
excp = task_result(t)
234-
if excp isa Exception
235-
close(c, excp)
236-
return
237-
end
228+
function close_chnl_on_taskdone(t::Task, c::Channel)
229+
isopen(c) || return
230+
cleanup = () -> try
231+
isopen(c) || return
232+
if istaskfailed(t)
233+
excp = task_result(t)
234+
if excp isa Exception
235+
close(c, excp)
236+
return
238237
end
239-
close(c)
240-
return
241-
finally
242-
unlock(c)
243238
end
244-
if trylock(c)
245-
# can't use `lock`, since attempts to task-switch to wait for it
246-
# will just silently fail and leave us with broken state
247-
cleanup()
248-
else
249-
# so schedule this to happen once we are finished destroying our task
250-
# (on a new Task)
251-
@async (lock(c); cleanup())
239+
close(c)
240+
return
241+
finally
242+
unlock(c)
252243
end
244+
if trylock(c)
245+
# can't use `lock`, since attempts to task-switch to wait for it
246+
# will just silently fail and leave us with broken state
247+
cleanup()
248+
else
249+
# so schedule this to happen once we are finished destroying our task
250+
# (on a new Task)
251+
@async (lock(c); cleanup())
253252
end
254253
nothing
255254
end

test/channels.jl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,20 @@ using Distributed
8888
@testset "channels bound to tasks" for N in [0, 10]
8989
# Normal exit of task
9090
c = Channel(N)
91-
bind(c, @async (yield(); nothing))
91+
bind(c, @async (GC.gc(); yield(); nothing))
9292
@test_throws InvalidStateException take!(c)
9393
@test !isopen(c)
9494

9595
# Error exception in task
9696
c = Channel(N)
97-
bind(c, @async (yield(); error("foo")))
97+
bind(c, @async (GC.gc(); yield(); error("foo")))
9898
@test_throws ErrorException take!(c)
9999
@test !isopen(c)
100100

101101
# Multiple channels closed by the same bound task
102102
cs = [Channel(N) for i in 1:5]
103-
tf2 = () -> begin
103+
tf2() = begin
104+
GC.gc()
104105
if N > 0
105106
foreach(c -> (@assert take!(c) === 2), cs)
106107
end
@@ -129,16 +130,16 @@ using Distributed
129130
# Multiple tasks, first one to terminate closes the channel
130131
nth = rand(1:5)
131132
ref = Ref(0)
132-
cond = Condition()
133133
tf3(i) = begin
134+
GC.gc()
134135
if i == nth
135136
ref[] = i
136137
else
137138
sleep(2.0)
138139
end
139140
end
140141

141-
tasks = [Task(()->tf3(i)) for i in 1:5]
142+
tasks = [Task(() -> tf3(i)) for i in 1:5]
142143
c = Channel(N)
143144
foreach(t -> bind(c, t), tasks)
144145
foreach(schedule, tasks)

0 commit comments

Comments
 (0)