Skip to content

Conversation

vog
Copy link

@vog vog commented Feb 9, 2025

For buffered serialization into a flow, at the moment there is only Buf_write.with_flow, so those serializers are expected to always run wrapped within a single function.

This PR introduces Buf_write.of_flow which allows a serializer to outlive its invocation, but instead being restricted by a surrounding Switch, as usual. Also, proper closing of the write is ensured by Switch.on_release in success as well as error cases.. This makes Buf_write more straight forward to use e.g. in network protocols, which otherwise need unfortunate workarounds like this one:

Moreover, Buf_write.with_flow is simplified by just calling Buf_write.of_flow within a local Switch.

@patricoferris
Copy link
Collaborator

Hi @vog,

Thank you for the PR.

Unfortunately this would change the semantics of with_flow compared to what was there before, which accounts for the failing tests. The forked copy action relies on the fact that the buffer will eventually be closed and this will raise an End_of_file exception. Moving this into the switch's on_release callbacks means this will now only succeed if the user manually closes the buffer in with_flow.

Also you will need to expose of_flow in the .mli file for people to be able to use it.

Note also that you can, with a little boiler plate, workaround this by doing things manually:

let of_flow ~sw flow =
  (* Copied from https://github.com/ocaml-multicore/eio/blob/62b9714f0e2ed8e72046c8e5808202e1e3ca9cd7/lib_eio/buf_write.ml#L507

     We have to do our own copy, because we can't [shift] until the write is complete. *)
  let copy t flow =
    let rec aux () =
      let iovecs = Eio.Buf_write.await_batch t in
      let wrote = Eio.Flow.single_write flow iovecs in
      Eio.Buf_write.shift t wrote;
      aux ()
    in
    try aux ()
    with End_of_file -> ()
  in
  let t = Eio.Buf_write.create ~sw 0x1000 in
  Eio.Switch.on_release sw (fun () -> Eio.Buf_write.close t);
  Fiber.fork ~sw (fun () -> copy t flow);
  t

Note that this will likely have the same problems as before (perhaps a fork_daemon might make more sense there) but it really depends on your use case. Hopefully that helps a bit.

@talex5
Copy link
Collaborator

talex5 commented Aug 22, 2025

The Eio API is trying to avoid the mistake where you think you've finished writing but the buffer hasn't been flushed yet. Adding of_flow makes that mistake easy again (you have to wait for the switch to go out of scope to know the writes have taken effect).

In the PR you link, perhaps it would be simpler to return the flow and let the caller wrap it in a buffer?

@vog
Copy link
Author

vog commented Aug 22, 2025

The Eio API is trying to avoid the mistake where you think you've finished writing but the buffer hasn't been flushed yet. Adding of_flow makes that mistake easy again (you have to wait for the switch to go out of scope to know the writes have taken effect).

Very good point. These types of bugs are really nasty and hard to pin down. However, in the context of buffered writers, aren't users of such buffers already aware of the danger, and used to having to call flushat appropriate times for exactly that reason?

In the PR you link, perhaps it would be simpler to return the flow and let the caller wrap it in a buffer?

Well, the "caller" is generic code that is independent from EIO, LWT, etc., and even if I would refactor all that code as well, I'm pretty sure this would sneak into the library's API surface.

However, maybe I'm overlooking a simple solution, and PRs for pgx-eio are always welcome, especially from experienced EIO developers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants