@@ -94,6 +94,45 @@ module Locking = struct
94
94
Mutex. unlock t.mutex;
95
95
Some v
96
96
97
+ let select_of_many streams_fns =
98
+ let finished = Atomic. make false in
99
+ let cancel_fns = ref [] in
100
+ let add_cancel_fn fn = cancel_fns := fn :: ! cancel_fns in
101
+ let cancel_all () = List. iter (fun fn -> fn () ) ! cancel_fns in
102
+ let wait ctx enqueue (t , f ) = begin
103
+ Mutex. lock t.mutex;
104
+ (* First check if any items are already available and return early if there are. *)
105
+ if not (Queue. is_empty t.items)
106
+ then (
107
+ cancel_all () ;
108
+ Mutex. unlock t.mutex;
109
+ enqueue (Ok (f (Queue. take t.items))))
110
+ else add_cancel_fn @@
111
+ (* Otherwise, register interest in this stream. *)
112
+ Waiters. cancellable_await_internal ~mutex: (Some t.mutex) t.readers t.id ctx (fun r ->
113
+ if Result. is_ok r then (
114
+ if not (Atomic. compare_and_set finished false true ) then (
115
+ (* Another stream has yielded an item in the meantime. However, as
116
+ we have been waiting on this stream it must have been empty.
117
+
118
+ As the stream's mutex was held since before last checking for an item,
119
+ the queue must be empty.
120
+ *)
121
+ assert ((Queue. length t.items) < t.capacity);
122
+ Queue. add (Result. get_ok r) t.items
123
+ ) else (
124
+ (* remove all other entries of this fiber in other streams' waiters. *)
125
+ cancel_all ()
126
+ ));
127
+ (* item is returned to waiting caller through enqueue and enter_unchecked. *)
128
+ enqueue (Result. map f r))
129
+ end in
130
+ (* Register interest in all streams and return first available item. *)
131
+ let wait_for_stream streams_fns = begin
132
+ Suspend. enter_unchecked (fun ctx enqueue -> List. iter (wait ctx enqueue) streams_fns)
133
+ end in
134
+ wait_for_stream streams_fns
135
+
97
136
let length t =
98
137
Mutex. lock t.mutex;
99
138
let len = Queue. length t.items in
@@ -125,6 +164,13 @@ let take_nonblocking = function
125
164
| Sync x -> Sync. take_nonblocking x
126
165
| Locking x -> Locking. take_nonblocking x
127
166
167
+ let select streams =
168
+ let filter s = match s with
169
+ | (Sync _ , _ ) -> assert false
170
+ | (Locking x , f ) -> (x, f)
171
+ in
172
+ Locking. select_of_many (List. map filter streams)
173
+
128
174
let length = function
129
175
| Sync _ -> 0
130
176
| Locking x -> Locking. length x
0 commit comments