|
| 1 | + |
| 2 | +open Eio.Stdenv |
| 3 | +open Eio |
| 4 | +module Sync = Eio__Sync |
| 5 | + |
| 6 | +let sender_fibers = 4 |
| 7 | + |
| 8 | +let message = 1234 |
| 9 | + |
| 10 | +(* Send [n_msgs] items to streams in a round-robin way. *) |
| 11 | +let sender ~n_msgs streams = |
| 12 | + let msgs = Seq.take n_msgs (Seq.ints 0) in |
| 13 | + let streams = Seq.cycle (List.to_seq streams) in |
| 14 | + let zipped = Seq.zip msgs streams in |
| 15 | + ignore (Seq.iter (fun (_i, stream) -> |
| 16 | + Sync.put stream message) zipped) |
| 17 | + |
| 18 | +(* Start one sender fiber for each stream, and let it send n_msgs messages. |
| 19 | + Each fiber sends to all streams in a round-robin way. *) |
| 20 | +let run_senders ~dom_mgr ?(n_msgs = 100) streams = |
| 21 | + Switch.run @@ fun sw -> |
| 22 | + ignore @@ List.iter (fun _stream -> |
| 23 | + Fiber.fork ~sw (fun () -> |
| 24 | + Domain_manager.run dom_mgr (fun () -> |
| 25 | + sender ~n_msgs streams))) streams |
| 26 | + |
| 27 | +(* Receive messages from all streams. *) |
| 28 | +let receiver ~n_msgs streams = |
| 29 | + for _i = 1 to n_msgs do |
| 30 | + assert (Int.equal message (Sync.select_of_many streams)); |
| 31 | + done |
| 32 | + |
| 33 | +(* Create [n] streams. *) |
| 34 | +let make_streams n = |
| 35 | + let unfolder i = if i == 0 then None else Some (Sync.create (), i-1) in |
| 36 | + let seq = Seq.unfold unfolder n in |
| 37 | + List.of_seq seq |
| 38 | + |
| 39 | +let run env = |
| 40 | + let dom_mgr = domain_mgr env in |
| 41 | + let clock = clock env in |
| 42 | + let streams = make_streams sender_fibers in |
| 43 | + let selector = List.map (fun s -> (s, fun i -> i)) streams in |
| 44 | + let n_msgs = 10000 in |
| 45 | + Switch.run @@ fun sw -> |
| 46 | + Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams); |
| 47 | + let before = Time.now clock in |
| 48 | + receiver ~n_msgs:(sender_fibers * n_msgs) selector; |
| 49 | + let after = Time.now clock in |
| 50 | + let elapsed = after -. before in |
| 51 | + let time_per_iter = elapsed /. (Float.of_int @@ sender_fibers * n_msgs) in |
| 52 | + [Metric.create |
| 53 | + (Printf.sprintf "sync:true senders:%d msgs_per_sender:%d" sender_fibers n_msgs) |
| 54 | + (`Float (1e9 *. time_per_iter)) "ns" |
| 55 | + "Time per transmitted int"] |
| 56 | + |
| 57 | + |
0 commit comments