Skip to content

Commit dbd15fb

Browse files
committed
Add bounded blocking Stack
1 parent 1ba3357 commit dbd15fb

File tree

10 files changed

+235
-55
lines changed

10 files changed

+235
-55
lines changed

bench/bench_stack.ml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
open Multicore_bench
2+
open Picos_std_sync
3+
4+
let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
5+
let t = Stack.create ~padded:true () in
6+
7+
let op push =
8+
if push then Stack.push t 101
9+
else match Stack.pop_exn t with _ -> () | exception Stack.Empty -> ()
10+
in
11+
12+
let init _ =
13+
assert (
14+
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
15+
Util.generate_push_and_pop_sequence n_msgs
16+
in
17+
let work _ bits = Util.Bits.iter op bits in
18+
19+
Times.record ~budgetf ~n_domains:1 ~init ~work ()
20+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
21+
22+
let run_one ~budgetf ~n_adders ~n_takers () =
23+
let n_domains = n_adders + n_takers in
24+
25+
let n_msgs = 50 * Util.iter_factor in
26+
27+
let t = Stack.create ~padded:true () in
28+
29+
let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
30+
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in
31+
32+
let init _ =
33+
assert (
34+
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
35+
Countdown.non_atomic_set n_msgs_to_add n_msgs;
36+
Countdown.non_atomic_set n_msgs_to_take n_msgs
37+
in
38+
let work i () =
39+
if i < n_adders then
40+
let rec work () =
41+
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
42+
if 0 < n then begin
43+
for i = 1 to n do
44+
Stack.push t i
45+
done;
46+
work ()
47+
end
48+
in
49+
work ()
50+
else
51+
let i = i - n_adders in
52+
let rec work () =
53+
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
54+
if 0 < n then
55+
let rec loop n =
56+
if 0 < n then begin
57+
match Stack.pop_exn t with
58+
| _ -> loop (n - 1)
59+
| exception Stack.Empty ->
60+
Backoff.once Backoff.default |> ignore;
61+
loop n
62+
end
63+
else work ()
64+
in
65+
loop n
66+
in
67+
work ()
68+
in
69+
70+
let config =
71+
let format role n =
72+
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
73+
in
74+
Printf.sprintf "%s, %s"
75+
(format "nb adder" n_adders)
76+
(format "nb taker" n_takers)
77+
in
78+
Times.record ~budgetf ~n_domains ~init ~work ()
79+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
80+
81+
let run_suite ~budgetf =
82+
run_one_domain ~budgetf ()
83+
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
84+
|> List.concat_map @@ fun (n_adders, n_takers) ->
85+
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
86+
else run_one ~budgetf ~n_adders ~n_takers ())

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
(run %{test} -brief "Fib")
2323
(run %{test} -brief "Picos binaries")
2424
(run %{test} -brief "Bounded_q with Picos_sync")
25+
(run %{test} -brief "Stack")
2526
(run %{test} -brief "Memory usage")))
2627
(foreign_stubs
2728
(language c)

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ let benchmarks =
2222
("Picos binaries", Bench_binaries.run_suite);
2323
("Bounded_q with Picos_sync", Bench_bounded_q.run_suite);
2424
("Memory usage", Bench_memory.run_suite);
25+
("Stack", Bench_stack.run_suite);
2526
]
2627

2728
let () = Multicore_bench.Cmd.run ~benchmarks ()

lib/picos_std.awaitable/dune

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
(library
22
(name picos_std_awaitable)
33
(public_name picos_std.awaitable)
4-
(libraries picos picos_aux.htbl backoff multicore-magic))
4+
(libraries
5+
(re_export picos)
6+
picos_aux.htbl
7+
backoff
8+
multicore-magic))
59

610
(mdx
711
(package picos_meta)

lib/picos_std.awaitable/picos_std_awaitable.ml

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -355,13 +355,14 @@ module Awaitable = struct
355355
let update t ~signal ~count =
356356
try
357357
let signal = ref signal in
358+
let count = ref count in
358359
let backoff = ref Backoff.default in
359360
while
360361
not
361362
(let before = Htbl.find_exn awaiters t in
362363
match
363-
if !signal then Awaiters.signal before ~count
364-
else Awaiters.cleanup before ~count
364+
if !signal then Awaiters.signal before ~count:!count
365+
else Awaiters.cleanup before ~count:!count
365366
with
366367
| Zero -> Htbl.try_compare_and_remove awaiters t before
367368
| One r ->
@@ -373,58 +374,70 @@ module Awaitable = struct
373374
before == after
374375
|| Htbl.try_compare_and_set awaiters t before after)
375376
do
377+
(* Even if the hash table update after signal fails, the trigger(s) have
378+
been signaled. *)
376379
signal := false;
380+
(* If a single awaiter and multi awaiter cleanup are attempted in
381+
parallel it might be that a multi awaiter cleanup "succeeds" and yet
382+
some awaiters are left in the queue. For this reason we perform a
383+
multi awaiter cleanup after failure. It might be possible to improve
384+
upon this with some more clever approach. *)
385+
count := Int.max_int;
377386
backoff := Backoff.once !backoff
378387
done
379388
with Not_found -> ()
380389

381-
let add_as (type a) (t : a awaitable) value =
382-
let trigger = Trigger.create () in
383-
let one : Awaiters.is1 =
384-
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
385-
in
386-
let backoff = ref Backoff.default in
387-
while
388-
not
389-
(match Htbl.find_exn awaiters (Packed t) with
390-
| before ->
391-
let many = Awaiters.snoc before one in
392-
Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many)
393-
| exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one))
394-
do
395-
backoff := Backoff.once !backoff
396-
done;
397-
one
398-
399390
module Awaiter = struct
400391
type t = Awaiters.is1
401392

402-
let add (type a) (t : a awaitable) =
403-
add_as t (Sys.opaque_identity (Obj.magic awaiters : a))
393+
let add_as (type a) (t : a awaitable) trigger value =
394+
let one : Awaiters.is1 =
395+
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
396+
in
397+
let backoff = ref Backoff.default in
398+
while
399+
not
400+
(match Htbl.find_exn awaiters (Packed t) with
401+
| before ->
402+
let many = Awaiters.snoc before one in
403+
Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many)
404+
| exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one))
405+
do
406+
backoff := Backoff.once !backoff
407+
done;
408+
one
409+
410+
let add (type a) (t : a awaitable) trigger =
411+
let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in
412+
add_as t trigger unique_value
404413

405414
let remove one =
406415
Awaiters.signal_and_clear one;
407416
update (Awaiters.awaitable_of one) ~signal:false ~count:1
417+
end
408418

409-
let await one =
419+
let await t value =
420+
let trigger = Trigger.create () in
421+
let one = Awaiter.add_as t trigger value in
422+
if Awaiters.is_signalable one then Awaiter.remove one
423+
else
410424
match Awaiters.await one with
411425
| None -> ()
412426
| Some exn_bt ->
413427
Awaiters.clear one;
414428
update (Awaiters.awaitable_of one) ~signal:true ~count:1;
415429
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
416-
end
417-
418-
let await t value =
419-
let one = add_as t value in
420-
if Awaiters.is_signalable one then Awaiter.remove one else Awaiter.await one
421430

422431
let[@inline] broadcast t = update (Packed t) ~signal:true ~count:Int.max_int
423432
let[@inline] signal t = update (Packed t) ~signal:true ~count:1
424433

425434
let () =
426435
Stdlib.at_exit @@ fun () ->
427436
match Htbl.find_random_exn awaiters with
428-
| _ -> failwith "leaked awaitable"
437+
| _ ->
438+
(* This should not normally happen, but might happen due to the program
439+
being forced to exit without proper cleanup. Otherwise this may
440+
indicate a bug in the cleanup of awaiters. *)
441+
Printf.eprintf "Awaitable leaked\n%!"
429442
| exception Not_found -> ()
430443
end

lib/picos_std.awaitable/picos_std_awaitable.mli

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
(** Basic {{:https://en.wikipedia.org/wiki/Futex} futex}-like awaitable atomic
22
location for {!Picos}. *)
33

4+
open Picos
5+
46
(** {1 Modules} *)
57

68
module Awaitable : sig
@@ -18,7 +20,7 @@ module Awaitable : sig
1820

1921
(** {1 Atomic API} *)
2022

21-
type 'a t
23+
type !'a t
2224
(** Represents an awaitable atomic location. *)
2325

2426
val make : ?padded:bool -> 'a -> 'a t
@@ -90,34 +92,26 @@ module Awaitable : sig
9092
implicitly wake up awaiters. *)
9193

9294
module Awaiter : sig
93-
(** Ability to await for a signal from the past.
94-
95-
{!Awaitable.await} only receives a signal at or after the point of
96-
calling it. This API allows the awaiting process to be broken into two
97-
steps, {!add} and {!await}, such that a signal after {!add} can be
98-
received by {!await}. *)
95+
(** Low level interface for more flexible waiting. *)
9996

10097
type 'a awaitable := 'a t
10198
(** An erased type alias for {!Awaitable.t}. *)
10299

103100
type t
104101
(** Represents a single use awaiter of a signal to an {!awaitable}. *)
105102

106-
val add : 'a awaitable -> t
107-
(** [add awaitable] create a single use awaiter, adds it to the FIFO
108-
associated with the awaitable, and returns the awaiter. *)
109-
110-
val await : t -> unit
111-
(** [await awaiter] awaits for the association awaitable to be signaled. *)
103+
val add : 'a awaitable -> Trigger.t -> t
104+
(** [add awaitable trigger] creates a single use awaiter, adds it to the
105+
FIFO associated with the awaitable, and returns the awaiter. *)
112106

113107
val remove : t -> unit
114108
(** [remove awaiter] marks the awaiter as having been signaled and removes it
115109
from the FIFO associated with the awaitable.
116110
117-
⚠️ An explicit call of [remove] is needed when an {!add}ed awaiter is not
118-
{!await}ed for. In such a case, from the point of view of lost signals,
119-
the caller of [remove] should be considered to have received or consumed
120-
a signal before the call of [remove]. *)
111+
ℹ️ If the associated trigger is used with only one awaiter and the
112+
{!Trigger.await await} on the trigger returns [None], there is no need
113+
to explicitly remove the awaiter, because it has already been
114+
removed. *)
121115
end
122116
end
123117

@@ -164,7 +158,7 @@ end
164158
{2 [Condition]}
165159
166160
Let's also implement a condition variable. For that we'll also make use of
167-
low level operations in the {!Picos} core library:
161+
low level abstractions and operations from the {!Picos} core library:
168162
169163
{[
170164
# open Picos
@@ -180,20 +174,21 @@ end
180174
let create () = Awaitable.make ()
181175
182176
let wait t mutex =
183-
let awaiter = Awaitable.Awaiter.add t in
177+
let trigger = Trigger.create () in
178+
let awaiter = Awaitable.Awaiter.add t trigger in
184179
Mutex.unlock mutex;
185180
let lock_forbidden mutex =
186181
let fiber = Fiber.current () in
187182
let forbid = Fiber.exchange fiber ~forbid:true in
188183
Mutex.lock mutex;
189184
Fiber.set fiber ~forbid
190185
in
191-
match Awaitable.Awaiter.await awaiter with
192-
| () -> lock_forbidden mutex
193-
| exception exn ->
194-
let bt = Printexc.get_raw_backtrace () in
186+
match Trigger.await trigger with
187+
| None -> lock_forbidden mutex
188+
| Some exn_bt ->
189+
Awaitable.Awaiter.remove awaiter;
195190
lock_forbidden mutex;
196-
Printexc.raise_with_backtrace exn bt
191+
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
197192
198193
let signal = Awaitable.signal
199194
let broadcast = Awaitable.broadcast

lib/picos_std.sync/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
(public_name picos_std.sync)
44
(libraries
55
(re_export picos_std.event)
6-
picos
6+
picos_std.awaitable
77
backoff
88
multicore-magic))
99

lib/picos_std.sync/picos_std_sync.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ module Lazy = Lazy
55
module Latch = Latch
66
module Ivar = Ivar
77
module Stream = Stream
8+
module Stack = Stack

lib/picos_std.sync/picos_std_sync.mli

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,25 @@ module Stream : sig
409409
the [cursor] position. *)
410410
end
411411

412+
module Stack : sig
413+
(** *)
414+
415+
type !'a t
416+
(** *)
417+
418+
val create : ?padded:bool -> ?capacity:int -> unit -> 'a t
419+
(** *)
420+
421+
val push : 'a t -> 'a -> unit
422+
(** *)
423+
424+
exception Empty
425+
(** *)
426+
427+
val pop_exn : 'a t -> 'a
428+
(** *)
429+
end
430+
412431
(** {1 Examples}
413432
414433
{2 A simple bounded queue}

0 commit comments

Comments
 (0)