diff --git a/bench/bench_stack.ml b/bench/bench_stack.ml new file mode 100644 index 00000000..5ccea8c3 --- /dev/null +++ b/bench/bench_stack.ml @@ -0,0 +1,86 @@ +open Multicore_bench +open Picos_std_sync + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Stack.create ~padded:true () in + + let op push = + if push then Stack.push t 101 + else match Stack.pop_exn t with _ -> () | exception Stack.Empty -> () + in + + let init _ = + assert ( + match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ~n_adders ~n_takers () = + let n_domains = n_adders + n_takers in + + let n_msgs = 50 * Util.iter_factor in + + let t = Stack.create ~padded:true () in + + let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in + let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in + + let init _ = + assert ( + match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true); + Countdown.non_atomic_set n_msgs_to_add n_msgs; + Countdown.non_atomic_set n_msgs_to_take n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in + if 0 < n then begin + for i = 1 to n do + Stack.push t i + done; + work () + end + in + work () + else + let i = i - n_adders in + let rec work () = + let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in + if 0 < n then + let rec loop n = + if 0 < n then begin + match Stack.pop_exn t with + | _ -> loop (n - 1) + | exception Stack.Empty -> + Backoff.once Backoff.default |> ignore; + loop n + end + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + Times.record ~budgetf ~n_domains ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + if Picos_domain.recommended_domain_count () < n_adders + n_takers then [] + else run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/dune b/bench/dune index e39661d7..09f62c1d 100644 --- a/bench/dune +++ b/bench/dune @@ -23,6 +23,7 @@ (run %{test} -brief "Picos binaries") (run %{test} -brief "Bounded_q with Picos_std_sync") (run %{test} -brief "Memory usage") + (run %{test} -brief "Stack") ;; )) (foreign_stubs diff --git a/bench/main.ml b/bench/main.ml index e26b3f38..8a1e2074 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -22,6 +22,7 @@ let benchmarks = ("Picos binaries", Bench_binaries.run_suite); ("Bounded_q with Picos_std_sync", Bench_bounded_q.run_suite); ("Memory usage", Bench_memory.run_suite); + ("Stack", Bench_stack.run_suite); ] let () = Multicore_bench.Cmd.run ~benchmarks () diff --git a/lib/picos_std.sync/picos_std_sync.ml b/lib/picos_std.sync/picos_std_sync.ml index e892f6c1..af7225b8 100644 --- a/lib/picos_std.sync/picos_std_sync.ml +++ b/lib/picos_std.sync/picos_std_sync.ml @@ -9,3 +9,4 @@ module Latch = Latch module Barrier = Barrier module Ivar = Ivar module Stream = Stream +module Stack = Stack diff --git a/lib/picos_std.sync/picos_std_sync.mli b/lib/picos_std.sync/picos_std_sync.mli index ef1095f6..20eec5e0 100644 --- a/lib/picos_std.sync/picos_std_sync.mli +++ b/lib/picos_std.sync/picos_std_sync.mli @@ -753,6 +753,25 @@ module Stream : sig the [cursor] position. *) end +module Stack : sig + (** *) + + type !'a t + (** *) + + val create : ?padded:bool -> ?capacity:int -> unit -> 'a t + (** *) + + val push : 'a t -> 'a -> unit + (** *) + + exception Empty + (** *) + + val pop_exn : 'a t -> 'a + (** *) +end + (** {1 Examples} {2 A simple bounded queue} diff --git a/lib/picos_std.sync/stack.ml b/lib/picos_std.sync/stack.ml new file mode 100644 index 00000000..c4745141 --- /dev/null +++ b/lib/picos_std.sync/stack.ml @@ -0,0 +1,59 @@ +open Picos_std_awaitable + +type 'a state = + | Nil of { capacity : int } + | Cons of { capacity : int; value : 'a; rest : 'a state } + +let[@inline] capacity_of = function Nil r -> r.capacity | Cons r -> r.capacity + +type 'a t = 'a state Awaitable.t + +exception Empty + +let max_capacity = Int.max_int + +let create ?padded ?capacity () = + let capacity = + match capacity with + | None -> max_capacity + | Some capacity -> + if capacity < 1 || max_capacity < capacity then invalid_arg "capacity" + else capacity + in + Awaitable.make ?padded (Nil { capacity }) + +let rec push_await t value backoff = + let before = Awaitable.get t in + let capacity = capacity_of before - 1 in + if 0 <= capacity then + let after = Cons { capacity; value; rest = before } in + if Awaitable.compare_and_set t before after then Awaitable.signal t + else push_await t value (Backoff.once backoff) + else begin + Awaitable.await t before; + push_await t value Backoff.default + end + +let rec push t value backoff = + let before = Awaitable.get t in + let capacity = capacity_of before - 1 in + if 0 <= capacity then + let after = Cons { capacity; value; rest = before } in + if Awaitable.compare_and_set t before after then + match before with Nil _ -> Awaitable.signal t | Cons _ -> () + else push t value (Backoff.once backoff) + else push_await t value backoff + +let rec pop_exn t backoff = + match Awaitable.get t with + | Nil _ -> raise_notrace Empty + | Cons r as before -> + if Awaitable.compare_and_set t before r.rest then begin + let value = r.value in + if r.capacity = 0 then Awaitable.signal t; + value + end + else pop_exn t (Backoff.once backoff) + +let[@inline] push t value = push t value Backoff.default +let[@inline] pop_exn t = pop_exn t Backoff.default