@@ -389,13 +389,13 @@ let rec consumer_resume_cell t ~success ?in_transition cell =
389
389
if Atomic. compare_and_set cell old Finished then success req
390
390
else consumer_resume_cell t ~success ?in_transition cell
391
391
392
- let take_suspend_select ~enqueue ~ctx ~cancel_all t loc finished =
392
+ let take_suspend_select ~enqueue ~ctx ~cancel_all t f loc finished =
393
393
let Short cell | Long (_, cell) = loc in
394
394
let kc v = begin
395
395
if (Atomic. compare_and_set finished false true ) then (
396
396
cancel_all () ;
397
397
(* deliver value *)
398
- enqueue (Ok v );
398
+ enqueue (Ok (f v) );
399
399
true
400
400
) else (
401
401
(* reject value, let producer try again *)
@@ -447,12 +447,12 @@ let take (t : _ t) =
447
447
take_suspend t (Long (Q. next_suspend t.consumers))
448
448
)
449
449
450
- let select_of_many (type a ) (ts : a t list ) =
450
+ let select_of_many (type a b ) (ts : ( a t * (a -> b) ) list ) =
451
451
let finished = Atomic. make false in
452
452
let cancel_fns = ref [] in
453
453
let add_cancel_fn fn = cancel_fns := (fn :: ! cancel_fns) in
454
454
let cancel_all () = List. iter (fun fn -> fn () ) ! cancel_fns in
455
- let wait ctx enqueue (t : a t ) = begin
455
+ let wait ctx enqueue (( t , f ): ( a t * ( a -> b )) ) = begin
456
456
if (Atomic. fetch_and_add t.balance (- 1 )) > 0 then (
457
457
(* have item, can cancel remaining stream waiters*)
458
458
if Atomic. compare_and_set finished false true then (
@@ -464,14 +464,14 @@ let select_of_many (type a) (ts: a t list) =
464
464
let v = consumer_resume_cell t cell
465
465
~success: (fun it -> it.kp (Ok true ); it.v)
466
466
?in_transition:None in
467
- enqueue (Ok v )
467
+ enqueue (Ok (f v) )
468
468
) else (
469
469
(* restore old balance, because another stream was ready first. *)
470
470
ignore (Atomic. fetch_and_add t.balance (+ 1 ))
471
471
)
472
472
) else (
473
473
let cell = Long (Q. next_suspend t.consumers) in
474
- let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t cell finished in
474
+ let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t f cell finished in
475
475
add_cancel_fn cancel_fn
476
476
)
477
477
end in
0 commit comments