@@ -39,52 +39,76 @@ function Base.copy(t::Task)
3939end
4040
4141@suppress_err function Base. produce (v)
42- # ### un-optimized version
43- # q = current_task().consumers
44- # t = shift!(q.waitq)
45- # empty = isempty(q.waitq)
46- ct = current_task ()
47- local empty, t, q
48- while true
49- q = ct. consumers
50- if isa (q,Task)
51- t = q
52- ct. consumers = nothing
53- empty = true
54- break
55- elseif isa (q,Condition) && ! isempty (q. waitq)
56- t = shift! (q. waitq)
57- empty = isempty (q. waitq)
58- break
42+
43+ ct = current_task ()
44+ local empty, t, q
45+ while true
46+ q = ct. consumers
47+ if isa (q,Task)
48+ t = q
49+ ct. consumers = nothing
50+ empty = true
51+ break
52+ elseif isa (q,Condition) && ! isempty (q. waitq)
53+ t = shift! (q. waitq)
54+ empty = isempty (q. waitq)
55+ break
56+ end
57+ wait ()
5958 end
60- wait ()
61- end
6259
63- t. state = :runnable
64- if empty
65- if isempty (Base. Workqueue)
66- yieldto (t, v)
60+ t. state == :runnable || throw (AssertionError (" producer.consumer.state == :runnable" ))
61+ if empty
62+ Base. schedule_and_wait (t, v)
63+ ct = current_task () # When a task is copied, ct should be updated to new task ID.
64+ while true
65+ # wait until there are more consumers
66+ q = ct. consumers
67+ if isa (q,Task)
68+ return q. result
69+ elseif isa (q,Condition) && ! isempty (q. waitq)
70+ return q. waitq[1 ]. result
71+ end
72+ wait ()
73+ end
6774 else
68- Base. schedule_and_wait (t, v)
69- end
70- ct = current_task () # When a task is copied, ct should be updated to new task ID.
71- while true
72- # wait until there are more consumers
73- q = ct. consumers
74- if isa (q,Task)
75- return q. result
76- elseif isa (q,Condition) && ! isempty (q. waitq)
75+ schedule (t, v)
76+ # make sure `t` runs before us. otherwise, the producer might
77+ # finish before `t` runs again, causing it to see the producer
78+ # as done, causing done(::Task, _) to miss the value `v`.
79+ # see issue #7727
80+ yield ()
7781 return q. waitq[1 ]. result
78- end
79- wait ()
8082 end
81- else
82- schedule (t, v)
83- # make sure `t` runs before us. otherwise, the producer might
84- # finish before `t` runs again, causing it to see the producer
85- # as done, causing done(::Task, _) to miss the value `v`.
86- # see issue #7727
87- yield ()
88- return q. waitq[1 ]. result
89- end
83+ end
84+
85+ produce (v... ) = produce (v)
86+
87+ @suppress_err function Base. consume (P:: Task , values... )
88+
89+ if istaskdone (P)
90+ return wait (P)
91+ end
92+
93+ ct = current_task ()
94+ ct. result = length (values)== 1 ? values[1 ] : values
95+
96+ # ### un-optimized version
97+ # if P.consumers === nothing
98+ # P.consumers = Condition()
99+ # end
100+ # push!(P.consumers.waitq, ct)
101+ # optimized version that avoids the queue for 1 consumer
102+ if P. consumers === nothing || (isa (P. consumers,Condition)&& isempty (P. consumers. waitq))
103+ P. consumers = ct
104+ else
105+ if isa (P. consumers, Task)
106+ t = P. consumers
107+ P. consumers = Condition ()
108+ push! (P. consumers. waitq, t)
109+ end
110+ push! (P. consumers. waitq, ct)
111+ end
112+
113+ P. state == :runnable ? Base. schedule_and_wait (P) : wait () # don't attempt to queue it twice
90114end
0 commit comments