@@ -21,6 +21,7 @@ type Channel{T} <: AbstractChannel
2121 cond_take:: Condition # waiting for data to become available
2222 cond_put:: Condition # waiting for a writeable slot
2323 state:: Symbol
24+ excp:: Nullable{Exception} # Exception to be thrown when state != :open
2425
2526 data:: Array{T,1}
2627 sz_max:: Int # maximum size of channel
@@ -39,7 +40,7 @@ type Channel{T} <: AbstractChannel
3940 if sz < 0
4041 throw (ArgumentError (" Channel size must be either 0, a positive integer or Inf" ))
4142 end
42- new (Condition (), Condition (), :open , Array {T} (0 ), sz, Array {Condition} (0 ))
43+ new (Condition (), Condition (), :open , Nullable {Exception} (), Array {T} (0 ), sz, Array {Condition} (0 ))
4344 end
4445
4546 # deprecated empty constructor
5354
5455Channel (sz) = Channel {Any} (sz)
5556
57+ # special constructors
58+ """
59+ Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
60+
61+ Creates a new task from `func`, binds it to a new channel of type
62+ `ctype` and size `csize`, schedules the task, all in a single call.
63+
64+ `func` must accept the bound channel as its only argument.
65+
66+ If you need a reference to the created task, pass a `Ref{Task}` object via
67+ keyword argument `taskref`.
68+
69+ Returns a Channel.
70+
71+ ```jldoctest
72+ julia> chnl = Channel(c->foreach(i->put!(c,i), 1:4));
73+
74+ julia> @show typeof(chnl);
75+ typeof(chnl) = Channel{Any}
76+
77+ julia> for i in chnl
78+ @show i
79+ end;
80+ i = 1
81+ i = 2
82+ i = 3
83+ i = 4
84+
85+ ```
86+
87+ An example of referencing the created task:
88+
89+ ```jldoctest
90+ julia> taskref = Ref{Task}();
91+
92+ julia> chnl = Channel(c->(@show take!(c)); taskref=taskref);
93+
94+ julia> task = taskref[];
95+
96+ julia> @show istaskdone(task);
97+ istaskdone(task) = false
98+
99+ julia> put!(chnl, "Hello");
100+ take!(c) = "Hello"
101+
102+ julia> @show istaskdone(task);
103+ istaskdone(task) = true
104+
105+ ```
106+ """
107+ function Channel (func:: Function ; ctype= Any, csize= 0 , taskref= nothing )
108+ chnl = Channel {ctype} (csize)
109+ task = Task (()-> func (chnl))
110+ bind (chnl,task)
111+ schedule (task)
112+ yield ()
113+
114+ isa (taskref, Ref{Task}) && (taskref. x = task)
115+ return chnl
116+ end
117+
118+
119+
56120# deprecated empty constructor
57121Channel () = Channel {Any} ()
58122
59123closed_exception () = InvalidStateException (" Channel is closed." , :closed )
60124
61125isbuffered (c:: Channel ) = c. sz_max== 0 ? false : true
62126
127+ function check_channel_state (c:: Channel )
128+ if ! isopen (c)
129+ ! isnull (c. excp) && throw (get (c. excp))
130+ throw (closed_exception ())
131+ end
132+ end
63133"""
64134 close(c::Channel)
65135
@@ -70,11 +140,110 @@ Closes a channel. An exception is thrown by:
70140"""
71141function close (c:: Channel )
72142 c. state = :closed
73- notify_error (c:: Channel , closed_exception ())
143+ c. excp = Nullable {} (closed_exception ())
144+ notify_error (c)
74145 nothing
75146end
76147isopen (c:: Channel ) = (c. state == :open )
77148
149+ """
150+ bind(chnl::Channel, task::Task)
151+
152+ Associates the lifetime of `chnl` with a task.
153+ Channel `chnl` is automatically closed when the task terminates.
154+ Any uncaught exception in the task is propagated to all waiters on `chnl`.
155+
156+ The `chnl` object can be explicitly closed independent of task termination.
157+ Terminating tasks have no effect on already closed Channel objects.
158+
159+ When a channel is bound to multiple tasks, the first task to terminate will
160+ close the channel. When multiple channels are bound to the same task,
161+ termination of the task will close all channels.
162+
163+ ```jldoctest
164+ julia> c = Channel(0);
165+
166+ julia> task = @schedule foreach(i->put!(c, i), 1:4);
167+
168+ julia> bind(c,task);
169+
170+ julia> for i in c
171+ @show i
172+ end;
173+ i = 1
174+ i = 2
175+ i = 3
176+ i = 4
177+
178+ julia> @show isopen(c);
179+ isopen(c) = false
180+
181+ ```
182+
183+ ```jldoctest
184+ julia> c = Channel(0);
185+
186+ julia> task = @schedule (put!(c,1);error("foo"));
187+
188+ julia> bind(c,task);
189+
190+ julia> take!(c);
191+
192+ julia> put!(c,1);
193+ ERROR: foo
194+ Stacktrace:
195+ [1] check_channel_state(::Channel{Any}) at ./channels.jl:129
196+ [2] put!(::Channel{Any}, ::Int64) at ./channels.jl:247
197+
198+ ```
199+ """
200+ function bind (c:: Channel , task:: Task )
201+ ref = WeakRef (c)
202+ register_taskdone_hook (task, tsk-> close_chnl_on_taskdone (tsk, ref))
203+ c
204+ end
205+
206+ """
207+ channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
208+
209+ A convenience method to create `n` channels and bind them to tasks started
210+ from the provided functions in a single call. Each `func` must accept `n` arguments
211+ which are the created channels. Channel types and sizes may be specified via
212+ keyword arguments `ctypes` and `csizes` respectively. If unspecified, all channels are
213+ of type `Channel{Any}(0)`.
214+
215+ Returns a tuple, `(Array{Channel}, Array{Task})`, of the created channels and tasks.
216+ """
217+ function channeled_tasks (n:: Int , funcs... ; ctypes= fill (Any,n), csizes= fill (0 ,n))
218+ @assert length (csizes) == n
219+ @assert length (ctypes) == n
220+
221+ chnls = map (i-> Channel {ctypes[i]} (csizes[i]), 1 : n)
222+ tasks= Task[Task (()-> f (chnls... )) for f in funcs]
223+
224+ # bind all tasks to all channels and schedule them
225+ foreach (t -> foreach (c -> bind (c,t), chnls), tasks)
226+ foreach (t-> schedule (t), tasks)
227+
228+ yield () # Allow scheduled tasks to run
229+
230+ return (chnls, tasks)
231+ end
232+
233+ function close_chnl_on_taskdone (t:: Task , ref:: WeakRef )
234+ if ref. value != = nothing
235+ c = ref. value
236+ ! isopen (c) && return
237+ if istaskfailed (t)
238+ c. state = :closed
239+ c. excp = Nullable {Exception} (task_result (t))
240+ notify_error (c)
241+ else
242+ close (c)
243+ end
244+ end
245+ end
246+
78247type InvalidStateException <: Exception
79248 msg:: AbstractString
80249 state:: Symbol
@@ -89,7 +258,7 @@ For unbuffered channels, blocks until a [`take!`](@ref) is performed by a differ
89258task.
90259"""
91260function put! (c:: Channel , v)
92- ! isopen (c) && throw ( closed_exception () )
261+ check_channel_state (c )
93262 isbuffered (c) ? put_buffered (c,v) : put_unbuffered (c,v)
94263end
95264
@@ -98,7 +267,9 @@ function put_buffered(c::Channel, v)
98267 wait (c. cond_put)
99268 end
100269 push! (c. data, v)
101- notify (c. cond_take, nothing , true , false ) # notify all, since some of the waiters may be on a "fetch" call.
270+
271+ # notify all, since some of the waiters may be on a "fetch" call.
272+ notify (c. cond_take, nothing , true , false )
102273 v
103274end
104275
@@ -108,7 +279,7 @@ function put_unbuffered(c::Channel, v)
108279 wait (c. cond_put)
109280 end
110281 cond_taker = shift! (c. takers)
111- notify (cond_taker, v, false , false )
282+ notify (cond_taker, v, false , false ) > 0 && yield ()
112283 v
113284end
114285
@@ -148,7 +319,7 @@ shift!(c::Channel) = take!(c)
148319
149320# 0-size channel
150321function take_unbuffered (c:: Channel )
151- ! isopen (c) && throw ( closed_exception () )
322+ check_channel_state (c )
152323 cond_taker = Condition ()
153324 push! (c. takers, cond_taker)
154325 notify (c. cond_put, nothing , false , false )
@@ -178,7 +349,7 @@ n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)
178349
179350function wait (c:: Channel )
180351 while ! isready (c)
181- ! isopen (c) && throw ( closed_exception () )
352+ check_channel_state (c )
182353 wait (c. cond_take)
183354 end
184355 nothing
@@ -189,19 +360,20 @@ function notify_error(c::Channel, err)
189360 notify_error (c. cond_put, err)
190361 foreach (x-> notify_error (x, err), c. takers)
191362end
363+ notify_error (c:: Channel ) = notify_error (c, get (c. excp))
192364
193365eltype {T} (:: Type{Channel{T}} ) = T
194366
195367show (io:: IO , c:: Channel ) = print (io, " $(typeof (c)) (sz_max:$(c. sz_max) ,sz_curr:$(n_avail (c)) )" )
196368
197- type ChannelState {T}
369+ type ChannelIterState {T}
198370 hasval:: Bool
199371 val:: T
200- ChannelState (x) = new (x)
372+ ChannelIterState (x) = new (x)
201373end
202374
203- start {T} (c:: Channel{T} ) = ChannelState {T} (false )
204- function done (c:: Channel , state:: ChannelState )
375+ start {T} (c:: Channel{T} ) = ChannelIterState {T} (false )
376+ function done (c:: Channel , state:: ChannelIterState )
205377 try
206378 # we are waiting either for more data or channel to be closed
207379 state. hasval && return false
0 commit comments