From 29d41d4023f95933b04170c65267177195e0b526 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Thu, 27 Apr 2023 18:43:37 +0100 Subject: [PATCH 01/18] Posix-based windows implementation --- .github/workflows/main.yml | 18 ++ doc/dune | 1 + dune | 1 + dune-project | 6 +- eio_windows.opam | 7 +- eio_windows.opam.template | 3 + lib_eio/unix/fork/dune | 4 + lib_eio/unix/fork_action.c | 17 ++ lib_eio/unix/stubs.c | 4 + lib_eio_windows/children.ml | 88 +++++++ lib_eio_windows/children.mli | 14 ++ lib_eio_windows/domain_mgr.ml | 85 +++++++ lib_eio_windows/dune | 12 +- lib_eio_windows/eio_posix_stubs.c | 90 +++++++ lib_eio_windows/eio_windows.ml | 56 ++++- lib_eio_windows/eio_windows.mli | 24 ++ lib_eio_windows/err.ml | 29 +++ lib_eio_windows/fd.ml | 16 ++ lib_eio_windows/fd.mli | 49 ++++ lib_eio_windows/flow.ml | 86 +++++++ lib_eio_windows/fs.ml | 184 ++++++++++++++ lib_eio_windows/include/discover.ml | 27 +++ lib_eio_windows/include/dune | 4 + lib_eio_windows/low_level.ml | 282 +++++++++++++++++++++ lib_eio_windows/low_level.mli | 102 ++++++++ lib_eio_windows/net.ml | 158 ++++++++++++ lib_eio_windows/sched.ml | 364 ++++++++++++++++++++++++++++ lib_eio_windows/sched.mli | 44 ++++ lib_eio_windows/test/dune | 5 + lib_eio_windows/test/test.ml | 67 +++++ lib_eio_windows/time.ml | 19 ++ tests/dune | 1 + 32 files changed, 1860 insertions(+), 7 deletions(-) mode change 100644 => 100755 eio_windows.opam create mode 100755 eio_windows.opam.template create mode 100755 lib_eio/unix/fork/dune create mode 100755 lib_eio_windows/children.ml create mode 100755 lib_eio_windows/children.mli create mode 100755 lib_eio_windows/domain_mgr.ml create mode 100755 lib_eio_windows/eio_posix_stubs.c mode change 100644 => 100755 lib_eio_windows/eio_windows.ml create mode 100755 lib_eio_windows/eio_windows.mli create mode 100755 lib_eio_windows/err.ml create mode 100755 lib_eio_windows/fd.ml create mode 100755 lib_eio_windows/fd.mli create mode 100755 lib_eio_windows/flow.ml create mode 100755 lib_eio_windows/fs.ml create mode 100755 lib_eio_windows/include/discover.ml create mode 100755 lib_eio_windows/include/dune create mode 100755 lib_eio_windows/low_level.ml create mode 100755 lib_eio_windows/low_level.mli create mode 100755 lib_eio_windows/net.ml create mode 100755 lib_eio_windows/sched.ml create mode 100755 lib_eio_windows/sched.mli create mode 100755 lib_eio_windows/test/dune create mode 100755 lib_eio_windows/test/test.ml create mode 100755 lib_eio_windows/time.ml diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 61d06a0b5..cda6bc432 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,7 +32,25 @@ jobs: - run: opam --cli=2.1 pin -yn --with-version=dev . - run: opam install ${{ matrix.local-packages }} --deps-only --with-test - run: opam install ${{ matrix.local-packages }} --with-test + windows: + runs-on: windows-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set-up OCaml + uses: ocaml/setup-ocaml@v2 + with: + opam-pin: false + opam-depext: false + ocaml-compiler: ocaml.5.0.0,ocaml-option-mingw + opam-repositories: | + dra27: https://github.com/dra27/opam-repository.git#windows-5.0 + normal: https://github.com/ocaml/opam-repository.git + default: https://github.com/ocaml-opam/opam-repository-mingw.git#opam2 + - run: opam pin -yn . && opam install eio_windows --deps-only --with-test + - run: opam exec -- dune runtest docker: runs-on: ubuntu-latest steps: diff --git a/doc/dune b/doc/dune index e1a3c0d78..8ea33088a 100644 --- a/doc/dune +++ b/doc/dune @@ -1,4 +1,5 @@ (mdx (package eio_main) (deps (package eio_main) (env_var "EIO_BACKEND")) + (enabled_if (<> %{os_type} "Win32")) (files multicore.md)) diff --git a/dune b/dune index 60c8c18b0..e3e6643d1 100644 --- a/dune +++ b/dune @@ -2,4 +2,5 @@ (package eio_main) (deps (package eio_main) (env_var "EIO_BACKEND")) (preludes doc/prelude.ml) + (enabled_if (<> %{os_type} "Win32")) (files README.md)) diff --git a/dune-project b/dune-project index 5f8734aee..66cb2f54c 100644 --- a/dune-project +++ b/dune-project @@ -56,10 +56,12 @@ (package (name eio_windows) (synopsis "Eio implementation for Windows") - (description "An Eio implementation using I/O Completion Ports") + (description "An Eio implementation using OCaml's Unix.select") (allow_empty) ; Work-around for dune bug #6938 (depends - (eio (= :version)))) + (eio (= :version)) + (cstruct-unix (= dev)) + (alcotest (and (>= 1.4.0) :with-test)))) (package (name eio_main) (synopsis "Effect-based direct-style IO mainloop for OCaml") diff --git a/eio_windows.opam b/eio_windows.opam old mode 100644 new mode 100755 index bf718229f..9ff9f87bb --- a/eio_windows.opam +++ b/eio_windows.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" synopsis: "Eio implementation for Windows" -description: "An Eio implementation using I/O Completion Ports" +description: "An Eio implementation using OCaml's Unix.select" maintainer: ["anil@recoil.org"] authors: ["Anil Madhavapeddy" "Thomas Leonard"] license: "ISC" @@ -11,6 +11,8 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues" depends: [ "dune" {>= "3.7"} "eio" {= version} + "cstruct-unix" {= "dev"} + "alcotest" {>= "1.4.0" & with-test} "odoc" {with-doc} ] build: [ @@ -28,3 +30,6 @@ build: [ ] ] dev-repo: "git+https://github.com/ocaml-multicore/eio.git" +pin-depends: [ + [ "cstruct-unix.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] +] \ No newline at end of file diff --git a/eio_windows.opam.template b/eio_windows.opam.template new file mode 100755 index 000000000..5ffe99e8b --- /dev/null +++ b/eio_windows.opam.template @@ -0,0 +1,3 @@ +pin-depends: [ + [ "cstruct-unix.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] +] \ No newline at end of file diff --git a/lib_eio/unix/fork/dune b/lib_eio/unix/fork/dune new file mode 100755 index 000000000..dd2f8f134 --- /dev/null +++ b/lib_eio/unix/fork/dune @@ -0,0 +1,4 @@ +(library + (name eio_fork_actions) + (public_name eio.fork_actions) + (libraries )) \ No newline at end of file diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index 2c429862d..99c87ff7e 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -5,10 +5,14 @@ #include #include +#include #include "fork_action.h" void eio_unix_run_fork_actions(int errors, value v_actions) { + #ifdef _WIN32 + uerror("Unsupported operation on windows", Nothing); + #else int old_flags = fcntl(errors, F_GETFL, 0); fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK); while (Is_block(v_actions)) { @@ -18,6 +22,7 @@ void eio_unix_run_fork_actions(int errors, value v_actions) { v_actions = Field(v_actions, 1); } _exit(1); + #endif } static void try_write_all(int fd, char *buf) { @@ -67,6 +72,9 @@ CAMLprim value eio_unix_fork_execve(value v_unit) { } static void action_fchdir(int errors, value v_config) { + #ifdef _WIN32 + uerror("Unsupported operation on windows", Nothing); + #else value v_fd = Field(v_config, 1); int r; r = fchdir(Int_val(v_fd)); @@ -74,6 +82,7 @@ static void action_fchdir(int errors, value v_config) { eio_unix_fork_error(errors, "fchdir", strerror(errno)); _exit(1); } + #endif } CAMLprim value eio_unix_fork_fchdir(value v_unit) { @@ -95,6 +104,9 @@ CAMLprim value eio_unix_fork_chdir(value v_unit) { } static void set_blocking(int errors, int fd, int blocking) { + #ifdef _WIN32 + uerror("Unsupported operation on windows", Nothing); + #else int r = fcntl(fd, F_GETFL, 0); if (r != -1) { int flags = blocking @@ -108,9 +120,13 @@ static void set_blocking(int errors, int fd, int blocking) { eio_unix_fork_error(errors, "fcntl", strerror(errno)); _exit(1); } + #endif } static void set_cloexec(int errors, int fd, int cloexec) { + #ifdef _WIN32 + uerror("Unsupported operation on windows", Nothing); + #else int r = fcntl(fd, F_GETFD, 0); if (r != -1) { int flags = cloexec @@ -124,6 +140,7 @@ static void set_cloexec(int errors, int fd, int cloexec) { eio_unix_fork_error(errors, "fcntl", strerror(errno)); _exit(1); } + #endif } static void action_dups(int errors, value v_config) { diff --git a/lib_eio/unix/stubs.c b/lib_eio/unix/stubs.c index 968f3c20c..e6ff58bfc 100644 --- a/lib_eio/unix/stubs.c +++ b/lib_eio/unix/stubs.c @@ -5,6 +5,10 @@ #include CAMLprim value eio_unix_is_blocking(value v_fd) { + #ifdef _WIN32 + // We should not call this function from Windows + uerror("Unsupported blocking check on Windows", Nothing); + #else int fd = Int_val(v_fd); int r = fcntl(fd, F_GETFL, 0); if (r == -1) diff --git a/lib_eio_windows/children.ml b/lib_eio_windows/children.ml new file mode 100755 index 000000000..d5a12d139 --- /dev/null +++ b/lib_eio_windows/children.ml @@ -0,0 +1,88 @@ +(* Keep track of running child processes and notify their fiber when they exit. + After forking a child process, it gets registered in the global [db] along with a resolver + for the promise of its exit status. When we get a SIGCHLD signal, we reap all exited processes + and resolve their promises, waking whichever fibers are waiting for them. + + We have to be careful not to use a PID after [wait] reaps it, as the PID could have been reused by then. + + The signal handler can run in any domain or systhread, so we have to be careful about that too. + We can't defer the call to [wait] until we're running in an Eio domain as we don't know which domain + should handle it until [wait] gives as the process ID. We don't want to delegate to a particular domain + because it might be spinning doing CPU stuff for a long time. Instead, we try to take the lock in the + signal handler and do it there. If we can't get the lock then we just record that a wait is needed; + whoever holds the lock will soon release it and will do the reaping for us. + + Note that, since signal handlers are global, + this will interfere with any libraries trying to manage processes themselves. + + For systems with Process Descriptors we could skip all this nonsense and + just poll on the process's FD. e.g. using [pdfork] on FreeBSD or [CLONE_PIDFD] on Linux. *) + +open Eio.Std + +(* Each child process is registered in this table. + Must hold [lock] when accessing it. *) +let db : (int, Unix.process_status Promise.u) Hashtbl.t = Hashtbl.create 10 + +(* Set to [true] when we receive [SIGCHLD] and [false] before calling [wait]. *) +let need_wait = Atomic.make false + +(* [lock] must be held when spawning or reaping. Otherwise, this can happen: + + - We spawn process 100, adding it to [db]. + - It exits, sending us SIGCHLD. + - The signal handler calls [wait], reaping it. + - Another domain spawns another process 100 and adds it to [db], + overwriting the previous entry. + - The signal handler resumes, and gets the wrong entry. + + If [lock] is already locked when the SIGCHLD handler runs then it just leaves [need_wait = true] + (a signal handler can't wait on a mutex, since it may have interrupted the holder). + The unlocker needs to check [need_wait] after releasing the lock. *) +let lock = Mutex.create () + +(* [pid] has exited. Notify the waiter. Must hold [lock] when calling this. *) +let report_child_status pid status = + match Hashtbl.find_opt db pid with + | Some r -> + Hashtbl.remove db pid; + Promise.resolve r status + | None -> + (* Not one of ours. Not much we can do here. The spawner will probably get + an [ECHILD] error when they wait, which will do for the error. *) + () + +(* Must hold [lock] when calling this. *) +let rec reap () = + Atomic.set need_wait false; + match Unix.(waitpid [WNOHANG] (-1)) with + | 0, _ -> () (* Returned if there are children but none has exited yet. *) + | pid, status -> report_child_status pid status; reap () + | exception Unix.Unix_error (EINTR, _, _) -> reap () + | exception Unix.Unix_error (ECHILD, _, _) -> () (* Returned if there are no children at all. *) + +let rec reap_nonblocking () = + if Mutex.try_lock lock then ( + reap (); + Mutex.unlock lock; + if Atomic.get need_wait then reap_nonblocking () + ) (* else the unlocker will see [need_wait] and call us later *) + +let unlock () = + Mutex.unlock lock; + if Atomic.get need_wait then reap_nonblocking () + +(* Must hold [lock] when calling this. *) +let register pid = + assert (not (Hashtbl.mem db pid)); + let p, r = Promise.create () in + Hashtbl.add db pid r; + p + +let with_lock fn = + Mutex.lock lock; + Fun.protect fn ~finally:unlock + +let handle_sigchld () = + Atomic.set need_wait true; + reap_nonblocking () diff --git a/lib_eio_windows/children.mli b/lib_eio_windows/children.mli new file mode 100755 index 000000000..a8ead2276 --- /dev/null +++ b/lib_eio_windows/children.mli @@ -0,0 +1,14 @@ +(** Keep track of child processes and respond to SIGCHLD. *) + +val with_lock : (unit -> 'a) -> 'a +(** This must be held during the (fork, register) sequence + (so that we don't try to reap the process before it's registered), + and also when signalling a child process + (to ensure it isn't reaped at the same time). *) + +val register : int -> Unix.process_status Eio.Promise.t +(** [register pid] adds [pid] to the list of children and returns a promise for its exit status. + You must hold the lock while forking and then calling this. *) + +val handle_sigchld : unit -> unit +(** Call this on [SIGCHLD]. *) diff --git a/lib_eio_windows/domain_mgr.ml b/lib_eio_windows/domain_mgr.ml new file mode 100755 index 000000000..1c054d83e --- /dev/null +++ b/lib_eio_windows/domain_mgr.ml @@ -0,0 +1,85 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +open Eio.Std + +[@@@alert "-unstable"] + +module Fd = Eio_unix.Fd + +(* Run an event loop in the current domain, using [fn x] as the root fiber. *) +let run_event_loop fn x = + Sched.with_sched @@ fun sched -> + let open Effect.Deep in + let extra_effects : _ effect_handler = { + effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option -> + match e with + | Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k (Time.mono_clock : Eio.Time.Mono.t)) + | Eio_unix.Private.Socket_of_fd (sw, close_unix, unix_fd) -> Some (fun k -> + let fd = Fd.of_unix ~sw ~blocking:false ~close_unix unix_fd in + Unix.set_nonblock unix_fd; + continue k (Flow.of_fd fd :> Eio_unix.socket) + ) + | Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k -> + match + let unix_a, unix_b = Unix.socketpair ~cloexec:true domain ty protocol in + let a = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_a in + let b = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_b in + Unix.set_nonblock unix_a; + Unix.set_nonblock unix_b; + (Flow.of_fd a :> Eio_unix.socket), (Flow.of_fd b :> Eio_unix.socket) + with + | r -> continue k r + | exception Unix.Unix_error (code, name, arg) -> + discontinue k (Err.wrap code name arg) + ) + | Eio_unix.Private.Pipe sw -> Some (fun k -> + match + let r, w = Low_level.pipe ~sw in + let source = (Flow.of_fd r :> Eio_unix.source) in + let sink = (Flow.of_fd w :> Eio_unix.sink) in + (source, sink) + with + | r -> continue k r + | exception Unix.Unix_error (code, name, arg) -> + discontinue k (Err.wrap code name arg) + ) + | _ -> None + } + in + Sched.run ~extra_effects sched fn x + +let v = object + inherit Eio.Domain_manager.t + + method run_raw fn = + let domain = ref None in + Eio.Private.Suspend.enter (fun _ctx enqueue -> + domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue (Ok ())))) + ); + Domain.join (Option.get !domain) + + method run fn = + let domain = ref None in + Eio.Private.Suspend.enter (fun ctx enqueue -> + let cancelled, set_cancelled = Promise.create () in + Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled); + domain := Some (Domain.spawn (fun () -> + Fun.protect (run_event_loop (fun () -> fn ~cancelled)) + ~finally:(fun () -> enqueue (Ok ())))) + ); + Domain.join (Option.get !domain) +end diff --git a/lib_eio_windows/dune b/lib_eio_windows/dune index 9e2203057..aee7582f1 100644 --- a/lib_eio_windows/dune +++ b/lib_eio_windows/dune @@ -1,5 +1,15 @@ (library (name eio_windows) (public_name eio_windows) + (library_flags :standard -ccopt -lbcrypt) (enabled_if (= %{os_type} "Win32")) - (libraries eio eio.utils fmt)) + (foreign_stubs + (language c) + (flags :standard -D_LARGEFILE64_SOURCE) + (include_dirs ../lib_eio/unix/include) + (names eio_posix_stubs)) + (libraries eio eio.unix eio.utils fmt cstruct-unix)) + +(rule + (targets config.ml) + (action (run ./include/discover.exe))) \ No newline at end of file diff --git a/lib_eio_windows/eio_posix_stubs.c b/lib_eio_windows/eio_posix_stubs.c new file mode 100755 index 000000000..fd91b3fae --- /dev/null +++ b/lib_eio_windows/eio_posix_stubs.c @@ -0,0 +1,90 @@ +#define _FILE_OFFSET_BITS 64 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifdef ARCH_SIXTYFOUR +#define Int63_val(v) Long_val(v) +#else +#define Int63_val(v) (Int64_val(v)) >> 1 +#endif + +static void caml_stat_free_preserving_errno(void *ptr) +{ + int saved = errno; + caml_stat_free(ptr); + errno = saved; +} + +CAMLprim value caml_eio_windows_getrandom(value v_ba, value v_off, value v_len) +{ + CAMLparam1(v_ba); + ssize_t ret; + ssize_t off = (ssize_t)Long_val(v_off); + ssize_t len = (ssize_t)Long_val(v_len); + do + { + void *buf = (uint8_t *)Caml_ba_data_val(v_ba) + off; + caml_enter_blocking_section(); + ret = BCryptGenRandom(NULL, buf, len, BCRYPT_USE_SYSTEM_PREFERRED_RNG); + caml_leave_blocking_section(); + } while (errno == EINTR); + if (ret != STATUS_SUCCESS) + uerror("getrandom", Nothing); + CAMLreturn(Val_long(len)); +} + +CAMLprim value caml_eio_windows_readv(value v_fd, value v_bufs) +{ + uerror("Readv is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_preadv(value v_fd, value v_bufs, value v_offset) +{ + uerror("Preadv is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_pwritev(value v_fd, value v_bufs, value v_offset) +{ + uerror("Pwritev is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_openat(value v_dirfd, value v_pathname, value v_flags, value v_mode) +{ + uerror("Readv is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_mkdirat(value v_fd, value v_path, value v_perm) +{ + uerror("mkdirat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_unlinkat(value v_fd, value v_path, value v_dir) +{ + uerror("unlinkat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_renameat(value v_old_fd, value v_old_path, value v_new_fd, value v_new_path) +{ + uerror("renameat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_spawn(value v_errors, value v_actions) +{ + uerror("Processes are not supported on windows yet", Nothing); +} diff --git a/lib_eio_windows/eio_windows.ml b/lib_eio_windows/eio_windows.ml old mode 100644 new mode 100755 index 2cee04113..0e5cb59f5 --- a/lib_eio_windows/eio_windows.ml +++ b/lib_eio_windows/eio_windows.ml @@ -1,3 +1,53 @@ -(* Can base this on the eio_posix directory structure. - See HACKING.md for instructions on creating a new backend. *) -let run _main = failwith "TODO: Windows support." +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Low_level = Low_level + +type stdenv = < + stdin : ; + stdout : ; + stderr : ; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; + fs : Eio.Fs.dir Eio.Path.t; + cwd : Eio.Fs.dir Eio.Path.t; + secure_random : Eio.Flow.source; + debug : Eio.Debug.t; +> + +let run main = + (* SIGPIPE makes no sense in a modern application. *) + (* These don't work on Windows. *) + (* Sys.(set_signal sigpipe Signal_ignore); *) + (* Sys.(set_signal sigchld (Signal_handle (fun (_:int) -> Children.handle_sigchld ()))); *) + let stdin = (Flow.of_fd Eio_unix.Fd.stdin :> ) in + let stdout = (Flow.of_fd Eio_unix.Fd.stdout :> ) in + let stderr = (Flow.of_fd Eio_unix.Fd.stderr :> ) in + Domain_mgr.run_event_loop main @@ object (_ : stdenv) + method stdin = stdin + method stdout = stdout + method stderr = stderr + method debug = Eio.Private.Debug.v + method clock = Time.clock + method mono_clock = Time.mono_clock + method net = Net.v + method domain_mgr = Domain_mgr.v + method cwd = ((Fs.cwd, "") :> Eio.Fs.dir Eio.Path.t) + method fs = ((Fs.fs, "") :> Eio.Fs.dir Eio.Path.t) + method secure_random = Flow.secure_random + end diff --git a/lib_eio_windows/eio_windows.mli b/lib_eio_windows/eio_windows.mli new file mode 100755 index 000000000..259970bbc --- /dev/null +++ b/lib_eio_windows/eio_windows.mli @@ -0,0 +1,24 @@ +(** Fallback Eio backend for POSIX systems. *) + +type stdenv = < + stdin : ; + stdout : ; + stderr : ; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; + fs : Eio.Fs.dir Eio.Path.t; + cwd : Eio.Fs.dir Eio.Path.t; + secure_random : Eio.Flow.source; + debug : Eio.Debug.t; +> +(** An extended version of {!Eio.Stdenv.t} with some extra features available on POSIX systems. *) + +val run : (stdenv -> 'a) -> 'a +(** [run main] runs an event loop and calls [main stdenv] inside it. + + For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *) + +module Low_level = Low_level +(** Low-level API for making POSIX calls directly. *) diff --git a/lib_eio_windows/err.ml b/lib_eio_windows/err.ml new file mode 100755 index 000000000..f813a06c1 --- /dev/null +++ b/lib_eio_windows/err.ml @@ -0,0 +1,29 @@ +type Eio.Exn.Backend.t += + | Outside_sandbox of string * string + | Absolute_path + | Invalid_leaf of string + +let unclassified_error e = Eio.Exn.create (Eio.Exn.X e) + +let () = + Eio.Exn.Backend.register_pp (fun f -> function + | Outside_sandbox (path, dir) -> Fmt.pf f "Outside_sandbox (%S, %S)" path dir; true + | Absolute_path -> Fmt.pf f "Absolute_path"; true + | Invalid_leaf x -> Fmt.pf f "Invalid_leaf %S" x; true + | _ -> false + ) + +let wrap code name arg = + let e = Eio_unix.Unix_error (code, name, arg) in + match code with + | EEXIST -> Eio.Fs.err (Already_exists e) + | ENOENT -> Eio.Fs.err (Not_found e) + | EXDEV | EACCES | EPERM -> Eio.Fs.err (Permission_denied e) + | ECONNREFUSED -> Eio.Net.err (Connection_failure (Refused e)) + | ECONNRESET | EPIPE -> Eio.Net.err (Connection_reset e) + | _ -> unclassified_error e + +let run fn x = + try fn x + with Unix.Unix_error (code, name, arg) -> + raise (wrap code name arg) diff --git a/lib_eio_windows/fd.ml b/lib_eio_windows/fd.ml new file mode 100755 index 000000000..fec405e81 --- /dev/null +++ b/lib_eio_windows/fd.ml @@ -0,0 +1,16 @@ +(* Deprecated *) + +include Eio_unix.Fd + +let compare = Stdlib.compare + +type has_fd = Eio_unix.Resource.t + +let to_unix op t = + match op with + | `Take -> remove t |> Option.get + | `Peek -> use_exn "to_unix" t Fun.id + +let of_unix ~sw ~blocking ~close_unix fd = of_unix ~sw ~blocking ~close_unix fd + +let get_fd_opt = Eio_unix.Resource.fd_opt diff --git a/lib_eio_windows/fd.mli b/lib_eio_windows/fd.mli new file mode 100755 index 000000000..d58bf2ca4 --- /dev/null +++ b/lib_eio_windows/fd.mli @@ -0,0 +1,49 @@ +(** A safe wrapper for {!Unix.file_descr}. *) + +open Eio.Std + +type t = Eio_unix.Fd.t +(** A wrapper around a {!Unix.file_descr}. *) + +val compare : t -> t -> int + +val of_unix : sw:Switch.t -> blocking:bool -> close_unix:bool -> Unix.file_descr -> t +(** [of_unix ~sw ~blocking ~close_unix fd] wraps [fd]. + + @param sw Close [fd] automatically when [sw] is finished. + @param blocking Indicates whether [fd] is in blocking mode. + Normally you should call [Unix.set_nonblock fd] first and pass [false] here. + @param close_unix Whether {!close} also closes [fd] (this should normally be [true]). *) + +val use_exn : string -> t -> (Unix.file_descr -> 'a) -> 'a +(** [use_exn op t fn] calls [fn wrapped_fd], ensuring that [wrapped_fd] will not be closed + before [fn] returns. + + If [t] is already closed, it raises an exception, using [op] as the name of the failing operation. *) + +val close : t -> unit +(** [close t] marks [t] as closed, so that {!use_exn} can no longer be used to start new operations. + + The wrapped FD will be closed once all current users of the FD have finished (unless [close_unix = false]). + + @raise Invalid_argument if [t] is closed by another fiber first. *) + +val is_blocking : t -> bool +(** [is_blocking t] returns the value of [blocking] passed to {!of_unix}. *) + +val stdin : t +val stdout : t +val stderr : t + +val to_unix : [`Peek | `Take] -> t -> Unix.file_descr +(** [to_unix `Take t] closes [t] without closing the wrapped FD, which it returns to the caller once all operations on it have finished. + + [to_unix `Peek t] returns the wrapped FD directly. You must ensure that it is not closed while using it. *) + +type has_fd = < fd : t > +(** Resources that have FDs are sub-types of [has_fd]. *) + +val get_fd_opt : #Eio.Generic.t -> t option +(** [get_fd_opt r] returns the [t] being wrapped by a resource, if any. + + This just probes [r] using {!FD}. *) diff --git a/lib_eio_windows/flow.ml b/lib_eio_windows/flow.ml new file mode 100755 index 000000000..e541b18e2 --- /dev/null +++ b/lib_eio_windows/flow.ml @@ -0,0 +1,86 @@ +module Fd = Eio_unix.Fd + +let fstat fd = + try + let ust = Low_level.fstat fd in + let st_kind : Eio.File.Stat.kind = + match ust.st_kind with + | Unix.S_REG -> `Regular_file + | Unix.S_DIR -> `Directory + | Unix.S_CHR -> `Character_special + | Unix.S_BLK -> `Block_device + | Unix.S_LNK -> `Symbolic_link + | Unix.S_FIFO -> `Fifo + | Unix.S_SOCK -> `Socket + in + Eio.File.Stat.{ + dev = ust.st_dev |> Int64.of_int; + ino = ust.st_ino |> Int64.of_int; + kind = st_kind; + perm = ust.st_perm; + nlink = ust.st_nlink |> Int64.of_int; + uid = ust.st_uid |> Int64.of_int; + gid = ust.st_gid |> Int64.of_int; + rdev = ust.st_rdev |> Int64.of_int; + size = ust.st_size |> Optint.Int63.of_int64; + atime = ust.st_atime; + mtime = ust.st_mtime; + ctime = ust.st_ctime; + } + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + +let write_bufs fd bufs = + try + Low_level.writev fd bufs + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let copy src dst = + let buf = Cstruct.create 4096 in + try + while true do + let got = Eio.Flow.single_read src buf in + write_bufs dst [Cstruct.sub buf 0 got] + done + with End_of_file -> () + +let read fd buf = + match Low_level.read_cstruct fd buf with + | 0 -> raise End_of_file + | got -> got + | exception (Unix.Unix_error (code, name, arg)) -> raise (Err.wrap code name arg) + +let shutdown fd cmd = + try + Low_level.shutdown fd @@ match cmd with + | `Receive -> Unix.SHUTDOWN_RECEIVE + | `Send -> Unix.SHUTDOWN_SEND + | `All -> Unix.SHUTDOWN_ALL + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let of_fd fd = object (_ : ) + method fd = fd + + method read_methods = [] + method copy src = copy src fd + + method pread ~file_offset bufs = Low_level.preadv ~file_offset fd (Array.of_list bufs) + method pwrite ~file_offset bufs = Low_level.pwritev ~file_offset fd (Array.of_list bufs) + + method stat = fstat fd + method read_into buf = read fd buf + method write bufs = write_bufs fd bufs + method shutdown cmd = shutdown fd cmd + method close = Fd.close fd + + method probe : type a. a Eio.Generic.ty -> a option = function + | Eio_unix.Resource.FD -> Some fd + | _ -> None +end + +let secure_random = object + inherit Eio.Flow.source + + method read_into buf = + Low_level.getrandom buf; + Cstruct.length buf +end diff --git a/lib_eio_windows/fs.ml b/lib_eio_windows/fs.ml new file mode 100755 index 000000000..7afeba2f7 --- /dev/null +++ b/lib_eio_windows/fs.ml @@ -0,0 +1,184 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +(* This module provides (optional) sandboxing, allowing operations to be restricted to a subtree. + + For now, sandboxed directories use realpath and [O_NOFOLLOW], which is probably quite slow, + and requires duplicating a load of path lookup logic from the kernel. + It might be better to hold a directory FD rather than a path. + On FreeBSD we could use O_RESOLVE_BENEATH and let the OS handle everything for us. + On other systems we would have to resolve one path component at a time. *) + +open Eio.Std + +module Fd = Eio_unix.Fd + +class virtual posix_dir = object + inherit Eio.Fs.dir + + val virtual opt_nofollow : Low_level.Open_flags.t + (** Extra flags for open operations. Sandboxes will add [O_NOFOLLOW] here. *) + + method virtual private resolve : string -> string + (** [resolve path] returns the real path that should be used to access [path]. + For sandboxes, this is [realpath path] (and it checks that it is within the sandbox). + For unrestricted access, this is the identity function. *) + + method virtual with_parent_dir : 'a. (string -> (Fd.t option -> string -> 'a) -> 'a) + (** [with_parent_dir path fn] runs [fn dir_fd rel_path], + where [rel_path] accessed relative to [dir_fd] gives access to [path]. + For unrestricted access, this just runs [fn None path]. + For sandboxes, it opens the parent of [path] as [dir_fd] and runs [fn (Some dir_fd) (basename path)]. *) +end + +(* When renaming, we get a plain [Eio.Fs.dir]. We need extra access to check + that the new location is within its sandbox. *) +type _ Eio.Generic.ty += Posix_dir : posix_dir Eio.Generic.ty +let as_posix_dir x = Eio.Generic.probe x Posix_dir + +class virtual dir ~label = object (self) + inherit posix_dir + + val mutable closed = false + + method! probe : type a. a Eio.Generic.ty -> a option = function + | Posix_dir -> Some (self :> posix_dir) + | _ -> None + + method open_in ~sw path = + let fd = Err.run (Low_level.openat ~mode:0 ~sw (self#resolve path)) Low_level.Open_flags.(opt_nofollow + rdonly) in + (Flow.of_fd fd :> ) + + method open_out ~sw ~append ~create path = + let mode, flags = + match create with + | `Never -> 0, Low_level.Open_flags.empty + | `If_missing perm -> perm, Low_level.Open_flags.creat + | `Or_truncate perm -> perm, Low_level.Open_flags.(creat + trunc) + | `Exclusive perm -> perm, Low_level.Open_flags.(creat + excl) + in + let flags = if append then Low_level.Open_flags.(flags + append) else flags in + let flags = Low_level.Open_flags.(flags + rdwr + opt_nofollow) in + match + self#with_parent_dir path @@ fun dirfd path -> + Low_level.openat ?dirfd ~sw ~mode path flags + with + | fd -> (Flow.of_fd fd :> ) + | exception Unix.Unix_error (ELOOP, _, _) -> + (* The leaf was a symlink (or we're unconfined and the main path changed, but ignore that). + A leaf symlink might be OK, but we need to check it's still in the sandbox. + todo: possibly we should limit the number of redirections here, like the kernel does. *) + let target = Unix.readlink path in + let full_target = + if Filename.is_relative target then + Filename.concat (Filename.dirname path) target + else target + in + self#open_out ~sw ~append ~create full_target + | exception Unix.Unix_error (code, name, arg) -> + raise (Err.wrap code name arg) + + method mkdir ~perm path = + self#with_parent_dir path @@ fun dirfd path -> + Err.run (Low_level.mkdir ?dirfd ~mode:perm) path + + method unlink path = + self#with_parent_dir path @@ fun dirfd path -> + Err.run (Low_level.unlink ?dirfd ~dir:false) path + + method rmdir path = + self#with_parent_dir path @@ fun dirfd path -> + Err.run (Low_level.unlink ?dirfd ~dir:true) path + + method read_dir path = + (* todo: need fdopendir here to avoid races *) + let path = self#resolve path in + Err.run Low_level.readdir path + |> Array.to_list + + method rename old_path new_dir new_path = + match as_posix_dir new_dir with + | None -> invalid_arg "Target is not an eio_posix directory!" + | Some new_dir -> + self#with_parent_dir old_path @@ fun old_dir old_path -> + new_dir#with_parent_dir new_path @@ fun new_dir new_path -> + Err.run (Low_level.rename ?old_dir old_path ?new_dir) new_path + + method open_dir ~sw path = + Switch.check sw; + let label = Filename.basename path in + let d = new sandbox ~label (self#resolve path) in + Switch.on_release sw (fun () -> d#close); + (d :> Eio.Fs.dir_with_close) + + method close = closed <- true + + method pp f = Fmt.string f (String.escaped label) +end + +and sandbox ~label dir_path = object (self) + inherit dir ~label + + val opt_nofollow = Low_level.Open_flags.empty + (* TODO: Low_level.Open_flags.nofollow *) + + (* Resolve a relative path to an absolute one, with no symlinks. + @raise Eio.Fs.Permission_denied if it's outside of [dir_path]. *) + method private resolve path = + if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path; + if Filename.is_relative path then ( + let dir_path = Err.run Low_level.realpath dir_path in + let full = Err.run Low_level.realpath (Filename.concat dir_path path) in + let prefix_len = String.length dir_path + 1 in + if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then + full + else if full = dir_path then + full + else + raise @@ Eio.Fs.err (Permission_denied (Err.Outside_sandbox (full, dir_path))) + ) else ( + raise @@ Eio.Fs.err (Permission_denied Err.Absolute_path) + ) + + method with_parent_dir path fn = + if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path; + let dir, leaf = Filename.dirname path, Filename.basename path in + if leaf = ".." then ( + (* We could be smarter here and normalise the path first, but '..' + doesn't make sense for any of the current uses of [with_parent_dir] + anyway. *) + raise (Eio.Fs.err (Permission_denied (Err.Invalid_leaf leaf))) + ) else ( + let dir = self#resolve dir in + Switch.run @@ fun sw -> + (* TODO: Directory and no-follow needed *) + let dirfd = Low_level.openat ~sw ~mode:0 dir Low_level.Open_flags.rdonly in + fn (Some dirfd) leaf + ) +end + +(* Full access to the filesystem. *) +let fs = object + inherit dir ~label:"fs" + + val opt_nofollow = Low_level.Open_flags.empty + + (* No checks *) + method private resolve path = path + method private with_parent_dir path fn = fn None path +end + +let cwd = new sandbox ~label:"cwd" "." diff --git a/lib_eio_windows/include/discover.ml b/lib_eio_windows/include/discover.ml new file mode 100755 index 000000000..136d1200a --- /dev/null +++ b/lib_eio_windows/include/discover.ml @@ -0,0 +1,27 @@ +module C = Configurator.V1 + +let () = + C.main ~name:"discover" (fun c -> + let defs = + C.C_define.import c ~c_flags:["-D_LARGEFILE64_SOURCE"] + ~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"] + C.C_define.Type.[ + "_O_RDONLY", Int; + "_O_RDWR", Int; + "_O_WRONLY", Int; + "_O_APPEND", Int; + "_O_CREAT", Int; + "_O_NOINHERIT", Int; + "_O_TRUNC", Int; + "_O_EXCL", Int; + ] + |> List.map (function + | name, C.C_define.Value.Int v -> + let name_length = String.length name in + let name = String.sub name 1 (name_length - 1) in + Printf.sprintf "let %s = 0x%x" (String.lowercase_ascii name) v + | _ -> assert false + ) + in + C.Flags.write_lines "config.ml" defs + ) diff --git a/lib_eio_windows/include/dune b/lib_eio_windows/include/dune new file mode 100755 index 000000000..db98d61d5 --- /dev/null +++ b/lib_eio_windows/include/dune @@ -0,0 +1,4 @@ +(executable + (name discover) + (modules discover) + (libraries dune-configurator)) diff --git a/lib_eio_windows/low_level.ml b/lib_eio_windows/low_level.ml new file mode 100755 index 000000000..73714c36c --- /dev/null +++ b/lib_eio_windows/low_level.ml @@ -0,0 +1,282 @@ +open Eio.Std + +(* There are some things that should be improved here: + + - Blocking FDs (e.g. stdout) wait for the FD to become ready and then do a blocking operation. + This might not succeed, and will block the whole domain in that case. + Ideally, all blocking operations should happen in a sys-thread instead. + + - Various other operations, such as listing a directory, should also be done in a sys-thread + to avoid high latencies in the main domain. *) + +type ty = Read | Write + +module Fd = Fd + +(* todo: keeping a pool of workers is probably faster *) +let in_worker_thread = Eio_unix.run_in_systhread + +let await_readable fd = + Fd.use_exn "await_readable" fd @@ fun fd -> + Sched.enter @@ fun t k -> + Sched.await_readable t k fd + +let await_writable fd = + Fd.use_exn "await_writable" fd @@ fun fd -> + Sched.enter @@ fun t k -> + Sched.await_writable t k fd + +let rec do_nonblocking ty fn fd = + Fiber.yield (); + try fn fd with + | Unix.Unix_error (EINTR, _, _) -> + do_nonblocking ty fn fd (* Just in case *) + | Unix.Unix_error((EAGAIN | EWOULDBLOCK), _, _) -> + Sched.enter (fun t k -> + match ty with + | Read -> Sched.await_readable t k fd + | Write -> Sched.await_writable t k fd + ); + do_nonblocking ty fn fd + +let read fd buf start len = + Fd.use_exn "read" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix.read fd buf start len) fd + +let read_cstruct fd buf = + Fd.use_exn "read_cstruct" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix_cstruct.read fd buf) fd + +let write fd buf start len = + Fd.use_exn "write" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix.write fd buf start len) fd + +let sleep_until time = + Sched.enter @@ fun t k -> + Sched.await_timeout t k time + +let socket ~sw socket_domain socket_type protocol = + Switch.check sw; + let sock_unix = Unix.socket ~cloexec:true socket_domain socket_type protocol in + Unix.set_nonblock sock_unix; + Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix + +let connect fd addr = + try + Fd.use_exn "connect" fd (fun fd -> Unix.connect fd addr) + with + | Unix.Unix_error ((EINTR | EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) -> + await_writable fd; + match Fd.use_exn "connect" fd Unix.getsockopt_error with + | None -> () + | Some code -> raise (Err.wrap code "connect-in-progress" "") + +let accept ~sw sock = + Fd.use_exn "accept" sock @@ fun sock -> + let client, addr = + do_nonblocking Read (fun fd -> Switch.check sw; Unix.accept ~cloexec:true fd) sock + in + Unix.set_nonblock client; + Fd.of_unix ~sw ~blocking:false ~close_unix:true client, addr + +let shutdown sock cmd = + Fd.use_exn "shutdown" sock (fun fd -> Unix.shutdown fd cmd) + +let send_msg fd ~dst buf = + Fd.use_exn "send_msg" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix.sendto fd buf 0 (Bytes.length buf) [] dst) fd + +let recv_msg fd buf = + Fd.use_exn "recv_msg" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix.recvfrom fd buf 0 (Bytes.length buf) []) fd + +external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_windows_getrandom" + +let getrandom { Cstruct.buffer; off; len } = + let rec loop n = + if n = len then + () + else + loop (n + eio_getrandom buffer (off + n) (len - n)) + in + in_worker_thread @@ fun () -> + loop 0 + +let fstat fd = + Fd.use_exn "fstat" fd Unix.LargeFile.fstat + +let lstat path = + in_worker_thread @@ fun () -> + Unix.LargeFile.lstat path + +let realpath path = + in_worker_thread @@ fun () -> + Unix.realpath path + +let read_entries h = + let rec aux acc = + match Unix.readdir h with + | "." | ".." -> aux acc + | leaf -> aux (leaf :: acc) + | exception End_of_file -> Array.of_list acc + in + aux [] + +let readdir path = + in_worker_thread @@ fun () -> + let h = Unix.opendir path in + match read_entries h with + | r -> Unix.closedir h; r + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + Unix.closedir h; Printexc.raise_with_backtrace ex bt + +external eio_readv : Unix.file_descr -> Cstruct.t array -> int = "caml_eio_windows_readv" + +external eio_preadv : Unix.file_descr -> Cstruct.t array -> Optint.Int63.t -> int = "caml_eio_windows_preadv" +external eio_pwritev : Unix.file_descr -> Cstruct.t array -> Optint.Int63.t -> int = "caml_eio_windows_pwritev" + +let readv fd bufs = + Fd.use_exn "readv" fd @@ fun fd -> + do_nonblocking Read (fun fd -> eio_readv fd bufs) fd + +let writev fd bufs = + Fd.use_exn "writev" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix_cstruct.writev fd bufs) fd + +let preadv ~file_offset fd bufs = + Fd.use_exn "preadv" fd @@ fun fd -> + do_nonblocking Read (fun fd -> eio_preadv fd bufs file_offset) fd + +let pwritev ~file_offset fd bufs = + Fd.use_exn "pwritev" fd @@ fun fd -> + do_nonblocking Write (fun fd -> eio_pwritev fd bufs file_offset) fd + +module Open_flags = struct + type t = int + + let rdonly = Config.o_rdonly + let rdwr = Config.o_rdwr + let wronly = Config.o_wronly + let append = Config.o_append + let cloexec = Config.o_noinherit + let creat = Config.o_creat + let excl = Config.o_excl + (* let directory = Config.o_directory + let dsync = Config.o_dsync + let noctty = Config.o_noctty + let nofollow = Config.o_nofollow *) + (* let nonblock = Config.o_nonblock *) + (* let sync = Config.o_sync *) + let trunc = Config.o_trunc + + let empty = 0 + let ( + ) = ( lor ) +end + +let rec with_dirfd op dirfd fn = + match dirfd with + | None -> fn (Obj.magic (failwith "TODO AT_FDCWD") : Unix.file_descr) + | Some dirfd -> Fd.use_exn op dirfd fn + | exception Unix.Unix_error(Unix.EINTR, _, "") -> with_dirfd op dirfd fn + +external eio_openat : Unix.file_descr -> string -> Open_flags.t -> int -> Unix.file_descr = "caml_eio_windows_openat" + +let openat ?dirfd ~sw ~mode path flags = + with_dirfd "openat" dirfd @@ fun dirfd -> + Switch.check sw; + in_worker_thread (fun () -> eio_openat dirfd path Open_flags.(flags + cloexec (* + nonblock *)) mode) + |> Fd.of_unix ~sw ~blocking:false ~close_unix:true + +external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_windows_mkdirat" + +let mkdir ?dirfd ~mode path = + with_dirfd "mkdirat" dirfd @@ fun dirfd -> + in_worker_thread @@ fun () -> + eio_mkdirat dirfd path mode + +external eio_unlinkat : Unix.file_descr -> string -> bool -> unit = "caml_eio_windows_unlinkat" + +let unlink ?dirfd ~dir path = + with_dirfd "unlink" dirfd @@ fun dirfd -> + in_worker_thread @@ fun () -> + eio_unlinkat dirfd path dir + +external eio_renameat : Unix.file_descr -> string -> Unix.file_descr -> string -> unit = "caml_eio_windows_renameat" + +let rename ?old_dir old_path ?new_dir new_path = + with_dirfd "rename-old" old_dir @@ fun old_dir -> + with_dirfd "rename-new" new_dir @@ fun new_dir -> + in_worker_thread @@ fun () -> + eio_renameat old_dir old_path new_dir new_path + +let pipe ~sw = + let unix_r, unix_w = Unix.pipe ~cloexec:true () in + let r = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_r in + let w = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_w in + Unix.set_nonblock unix_r; + Unix.set_nonblock unix_w; + r, w + +module Process = struct + type t = { + pid : int; + exit_status : Unix.process_status Promise.t; + } + + let exit_status t = t.exit_status + let pid t = t.pid + + module Fork_action = Eio_unix.Private.Fork_action + + (* Read a (typically short) error message from a child process. *) + let rec read_response fd = + let buf = Bytes.create 256 in + match read fd buf 0 (Bytes.length buf) with + | 0 -> "" + | n -> Bytes.sub_string buf 0 n ^ read_response fd + + let with_pipe fn = + Switch.run @@ fun sw -> + let r, w = pipe ~sw in + fn r w + + let signal t signal = + (* The lock here ensures we don't signal the PID after reaping it. *) + Children.with_lock @@ fun () -> + if not (Promise.is_resolved t.exit_status) then ( + Unix.kill t.pid signal + ) + + external eio_spawn : Unix.file_descr -> Eio_unix.Private.Fork_action.c_action list -> int = "caml_eio_windows_spawn" + + let spawn ~sw actions = + with_pipe @@ fun errors_r errors_w -> + Eio_unix.Private.Fork_action.with_actions actions @@ fun c_actions -> + Switch.check sw; + let t = + (* We take the lock to ensure that the signal handler won't reap the + process before we've registered it. *) + Children.with_lock (fun () -> + let pid = + Fd.use_exn "errors-w" errors_w @@ fun errors_w -> + eio_spawn errors_w c_actions + in + Fd.close errors_w; + { pid; exit_status = Children.register pid } + ) + in + let hook = Switch.on_release_cancellable sw (fun () -> signal t Sys.sigkill) in + (* Removing the hook must be done from our own domain, not from the signal handler, + so fork a fiber to deal with that. If the switch gets cancelled then this won't + run, but then the [on_release] handler will run the hook soon anyway. *) + Fiber.fork_daemon ~sw (fun () -> + ignore (Promise.await t.exit_status : Unix.process_status); + Switch.remove_hook hook; + `Stop_daemon + ); + (* Check for errors starting the process. *) + match read_response errors_r with + | "" -> t (* Success! Execing the child closed [errors_w] and we got EOF. *) + | err -> failwith err +end diff --git a/lib_eio_windows/low_level.mli b/lib_eio_windows/low_level.mli new file mode 100755 index 000000000..967515e1a --- /dev/null +++ b/lib_eio_windows/low_level.mli @@ -0,0 +1,102 @@ +(** This module provides an effects-based API for calling POSIX functions. + + Normally it's better to use the cross-platform {!Eio} APIs instead, + which uses these functions automatically where appropriate. + + These functions mostly copy the POSIX APIs directly, except that: + + + They suspend the calling fiber instead of returning [EAGAIN] or similar. + + They handle [EINTR] by automatically restarting the call. + + They wrap {!Unix.file_descr} in {!Fd}, to avoid use-after-close bugs. + + They attach new FDs to switches, to avoid resource leaks. *) + +open Eio.Std + +type fd := Eio_unix.Fd.t + +module Fd = Fd +[@@deprecated "Use Eio_unix.Fd instead"] + +val await_readable : fd -> unit +val await_writable : fd -> unit + +val sleep_until : Mtime.t -> unit + +val read : fd -> bytes -> int -> int -> int +val read_cstruct : fd -> Cstruct.t -> int +val write : fd -> bytes -> int -> int -> int + +val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> fd +val connect : fd -> Unix.sockaddr -> unit +val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr + +val shutdown : fd -> Unix.shutdown_command -> unit + +val recv_msg : fd -> bytes -> int * Unix.sockaddr +val send_msg : fd -> dst:Unix.sockaddr -> bytes -> int + +val getrandom : Cstruct.t -> unit + +val fstat : fd -> Unix.LargeFile.stats +val lstat : string -> Unix.LargeFile.stats + +val realpath : string -> string + +val mkdir : ?dirfd:fd -> mode:int -> string -> unit +val unlink : ?dirfd:fd -> dir:bool -> string -> unit +val rename : ?old_dir:fd -> string -> ?new_dir:fd -> string -> unit + +val readdir : string -> string array + +val readv : fd -> Cstruct.t array -> int +val writev : fd -> Cstruct.t list -> unit + +val preadv : file_offset:Optint.Int63.t -> fd -> Cstruct.t array -> int +val pwritev : file_offset:Optint.Int63.t -> fd -> Cstruct.t array -> int + +val pipe : sw:Switch.t -> fd * fd + +module Open_flags : sig + type t + + val rdonly : t + val rdwr : t + val wronly : t + val append : t + val creat : t + val excl : t + val trunc : t + + val empty : t + val ( + ) : t -> t -> t +end + +val openat : ?dirfd:fd -> sw:Switch.t -> mode:int -> string -> Open_flags.t -> fd +(** Note: the returned FD is always non-blocking and close-on-exec. *) + +module Process : sig + type t + (** A child process. *) + + module Fork_action = Eio_unix.Private.Fork_action + (** Setup actions to perform in the child process. *) + + val spawn : sw:Switch.t -> Fork_action.t list -> t + (** [spawn ~sw actions] forks a child process, which executes [actions]. + The last action should be {!Fork_action.execve}. + + You will typically want to do [Promise.await (exit_status child)] after this. + + @param sw The child will be sent {!Sys.sigkill} if [sw] finishes. *) + + val signal : t -> int -> unit + (** [signal t x] sends signal [x] to [t]. + + This is similar to doing [Unix.kill t.pid x], + except that it ensures no signal is sent after [t] has been reaped. *) + + val pid : t -> int + + val exit_status : t -> Unix.process_status Promise.t + (** [exit_status t] is a promise for the process's exit status. *) +end diff --git a/lib_eio_windows/net.ml b/lib_eio_windows/net.ml new file mode 100755 index 000000000..27921307d --- /dev/null +++ b/lib_eio_windows/net.ml @@ -0,0 +1,158 @@ +open Eio.Std + +module Fd = Eio_unix.Fd + +let socket_domain_of = function + | `Unix _ -> Unix.PF_UNIX + | `UdpV4 -> Unix.PF_INET + | `UdpV6 -> Unix.PF_INET6 + | `Udp (host, _) + | `Tcp (host, _) -> + Eio.Net.Ipaddr.fold host + ~v4:(fun _ -> Unix.PF_INET) + ~v6:(fun _ -> Unix.PF_INET6) + +let listening_socket ~hook fd = object + inherit Eio.Net.listening_socket + + method close = + Switch.remove_hook hook; + Fd.close fd + + method accept ~sw = + let client, client_addr = Err.run (Low_level.accept ~sw) fd in + let client_addr = match client_addr with + | Unix.ADDR_UNIX path -> `Unix path + | Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port) + in + let flow = (Flow.of_fd client :> ) in + flow, client_addr + + method! probe : type a. a Eio.Generic.ty -> a option = function + | Eio_unix.Resource.FD -> Some fd + | _ -> None +end + +(* todo: would be nice to avoid copying between bytes and cstructs here *) +let udp_socket sock = object + inherit Eio.Net.datagram_socket + + method close = Fd.close sock + + method send sockaddr buf = + let addr = match sockaddr with + | `Udp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.ADDR_INET (host, port) + in + let sent = Err.run (Low_level.send_msg sock ~dst:addr) (Cstruct.to_bytes buf) in + assert (sent = Cstruct.length buf) + + method recv buf = + let b = Bytes.create (Cstruct.length buf) in + let recv, addr = Err.run (Low_level.recv_msg sock) b in + Cstruct.blit_from_bytes b 0 buf 0 recv; + match addr with + | Unix.ADDR_INET (inet, port) -> + `Udp (Eio_unix.Ipaddr.of_unix inet, port), recv + | Unix.ADDR_UNIX _ -> + raise (Failure "Expected INET UDP socket address but got Unix domain socket address.") +end + +(* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) +let getaddrinfo ~service node = + let to_eio_sockaddr_t {Unix.ai_family; ai_addr; ai_socktype; ai_protocol; _ } = + match ai_family, ai_socktype, ai_addr with + | (Unix.PF_INET | PF_INET6), + (Unix.SOCK_STREAM | SOCK_DGRAM), + Unix.ADDR_INET (inet_addr,port) -> ( + match ai_protocol with + | 6 -> Some (`Tcp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | 17 -> Some (`Udp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | _ -> None) + | _ -> None + in + Err.run Eio_unix.run_in_systhread @@ fun () -> + let rec aux () = + try + Unix.getaddrinfo node service [] + |> List.filter_map to_eio_sockaddr_t + with Unix.Unix_error (EINTR, _, _) -> aux () + in + aux () + +let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.stream) = + let socket_type, addr = + match listen_addr with + | `Unix path -> + if reuse_addr then ( + match Low_level.lstat path with + | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path + | _ -> () + | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () + | exception Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + ); + Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock = Low_level.socket ~sw (socket_domain_of listen_addr) socket_type 0 in + (* For Unix domain sockets, remove the path when done (except for abstract sockets). *) + let hook = + match listen_addr with + | `Unix path when String.length path > 0 && path.[0] <> Char.chr 0 -> + Switch.on_release_cancellable sw (fun () -> Unix.unlink path) + | `Unix _ | `Tcp _ -> + Switch.null_hook + in + Fd.use_exn "listen" sock (fun fd -> + if reuse_addr then + Unix.setsockopt fd Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt fd Unix.SO_REUSEPORT true; + Unix.bind fd addr; + Unix.listen fd backlog; + ); + listening_socket ~hook sock + +let connect ~sw connect_addr = + let socket_type, addr = + match connect_addr with + | `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in + try + Low_level.connect sock addr; + (Flow.of_fd sock :> ) + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let datagram_socket ~reuse_addr ~reuse_port ~sw saddr = + let sock = Low_level.socket ~sw (socket_domain_of saddr) Unix.SOCK_DGRAM 0 in + begin match saddr with + | `Udp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + let addr = Unix.ADDR_INET (host, port) in + Fd.use_exn "datagram_socket" sock (fun fd -> + if reuse_addr then + Unix.setsockopt fd Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt fd Unix.SO_REUSEPORT true; + Unix.bind fd addr + ) + | `UdpV4 | `UdpV6 -> () + end; + udp_socket sock + +let v = object + inherit Eio.Net.t + + method listen = listen + method connect = connect + method datagram_socket = datagram_socket + method getaddrinfo = getaddrinfo + method getnameinfo = Eio_unix.getnameinfo +end diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml new file mode 100755 index 000000000..f06d986d3 --- /dev/null +++ b/lib_eio_windows/sched.ml @@ -0,0 +1,364 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Suspended = Eio_utils.Suspended +module Zzz = Eio_utils.Zzz +module Lf_queue = Eio_utils.Lf_queue +module Fiber_context = Eio.Private.Fiber_context +module Ctf = Eio.Private.Ctf +module Rcfd = Eio_unix.Private.Rcfd + +type exit = [`Exit_scheduler] + +let system_thread = Ctf.mint_id () + +(* The type of items in the run queue. *) +type runnable = + | IO : runnable (* Reminder to check for IO *) + | Thread : 'a Suspended.t * 'a -> runnable (* Resume a fiber with a result value *) + | Failed_thread : 'a Suspended.t * exn -> runnable (* Resume a fiber with an exception *) + +(* For each FD we track which fibers are waiting for it to become readable/writeable. *) +type fd_event_waiters = { + read : unit Suspended.t Lwt_dllist.t; + write : unit Suspended.t Lwt_dllist.t; +} + +module FdMap = Map.Make (Int) + +(* A structure for storing the file descriptors for select. *) +type poll = { + mutable to_read : Unix.file_descr FdMap.t; + mutable to_write : Unix.file_descr FdMap.t; +} + +type t = { + (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) + run_q : runnable Lf_queue.t; + + poll : poll; + mutable poll_maxi : int; (* The highest index ever used in [poll]. *) + fd_map : (Unix.file_descr, fd_event_waiters) Hashtbl.t; + + (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. + In that case, [need_wakeup = true] and you must signal using [eventfd]. *) + eventfd : Rcfd.t; (* For sending events. *) + eventfd_r : Unix.file_descr; (* For reading events. *) + + mutable active_ops : int; (* Exit when this is zero and [run_q] and [sleep_q] are empty. *) + + (* If [false], the main thread will check [run_q] before sleeping again + (possibly because an event has been or will be sent to [eventfd]). + It can therefore be set to [false] in either of these cases: + - By the receiving thread because it will check [run_q] before sleeping, or + - By the sending thread because it will signal the main thread later *) + need_wakeup : bool Atomic.t; + + sleep_q: Zzz.t; (* Fibers waiting for timers. *) +} + +(* The message to send to [eventfd] (any character would do). *) +let wake_buffer = Bytes.of_string "!" + +(* This can be called from any systhread (including ones not running Eio), + and also from signal handlers or GC finalizers. It must not take any locks. *) +let wakeup t = + Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) + Rcfd.use t.eventfd + ~if_closed:ignore (* Domain has shut down (presumably after handling the event) *) + (fun fd -> + (* This can fail if the pipe is full, but then a wake up is pending anyway. *) + ignore (Unix.single_write fd wake_buffer 0 1 : int); + ) + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_thread t k x = + Lf_queue.push t.run_q (Thread (k, x)); + if Atomic.get t.need_wakeup then wakeup t + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_failed_thread t k ex = + Lf_queue.push t.run_q (Failed_thread (k, ex)); + if Atomic.get t.need_wakeup then wakeup t + +(* Can only be called from our own domain, so no need to check for wakeup. *) +let enqueue_at_head t k = + Lf_queue.push_head t.run_q (Thread (k, ())) + +let get_waiters t fd = + match Hashtbl.find_opt t.fd_map fd with + | Some x -> x + | None -> + let x = { read = Lwt_dllist.create (); write = Lwt_dllist.create () } in + Hashtbl.add t.fd_map fd x; + x + +(* The OS told us that the event pipe is ready. Remove events. *) +let clear_event_fd t = + let buf = Bytes.create 8 in (* Read up to 8 events at a time *) + let got = Unix.read t.eventfd_r buf 0 (Bytes.length buf) in + assert (got > 0) + +(* Update [t.poll]'s entry for [fd] to match [waiters]. *) +let update t waiters fd = + let fdi : int = Obj.magic fd in + let flags = + match not (Lwt_dllist.is_empty waiters.read), + not (Lwt_dllist.is_empty waiters.write) with + | false, false -> `Empty + | true, false -> `R + | false, true -> `W + | true, true -> `RW + in + match flags with + | `Empty -> ( + t.poll.to_read <- FdMap.remove fdi t.poll.to_read; + t.poll.to_write <- FdMap.remove fdi t.poll.to_write; + Hashtbl.remove t.fd_map fd + ) + | `R -> t.poll.to_read <- FdMap.add fdi fd t.poll.to_read + | `W -> t.poll.to_write <- FdMap.add fdi fd t.poll.to_write + | `RW -> + t.poll.to_read <- FdMap.add fdi fd t.poll.to_read; + t.poll.to_write <- FdMap.add fdi fd t.poll.to_write + +let resume t node = + t.active_ops <- t.active_ops - 1; + let k : unit Suspended.t = Lwt_dllist.get node in + Fiber_context.clear_cancel_fn k.fiber; + enqueue_thread t k () + +(* Called when poll indicates that an event we requested for [fd] is ready. *) +let ready t revents fd = + if fd == t.eventfd_r then ( + clear_event_fd t + (* The scheduler will now look at the run queue again and notice any new items. *) + ) else ( + let waiters = Hashtbl.find t.fd_map fd in + let pending = Lwt_dllist.create () in + if List.mem `W revents then + Lwt_dllist.transfer_l waiters.write pending; + if List.mem `R revents then + Lwt_dllist.transfer_l waiters.read pending; + (* If pending has things, it means we modified the waiters, refresh our view *) + if not (Lwt_dllist.is_empty pending) then + update t waiters fd; + Lwt_dllist.iter_node_r (resume t) pending + ) + +(* Switch control to the next ready continuation. + If none is ready, wait until we get an event to wake one and then switch. + Returns only if there is nothing to do and no active operations. *) +let rec next t : [`Exit_scheduler] = + (* Wakeup any paused fibers *) + match Lf_queue.pop t.run_q with + | None -> assert false (* We should always have an IO job, at least *) + | Some Thread (k, v) -> (* We already have a runnable task *) + Fiber_context.clear_cancel_fn k.fiber; + Suspended.continue k v + | Some Failed_thread (k, ex) -> + Fiber_context.clear_cancel_fn k.fiber; + Suspended.discontinue k ex + | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) + (* This is not a fair scheduler: timers always run before all other IO *) + let now = Mtime_clock.now () in + match Zzz.pop ~now t.sleep_q with + | `Due k -> + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + Suspended.continue k () (* A sleeping task is now due *) + | `Wait_until _ | `Nothing as next_due -> + let timeout = + match next_due with + | `Wait_until time -> + let time = Mtime.to_uint64_ns time in + let now = Mtime.to_uint64_ns now in + let diff_ns = Int64.sub time now in + (* Convert to seconds for Unix.select *) + let diff = Int64.(to_float diff_ns) /. 1_000_000_000. in + diff + | `Nothing -> (-1.) + in + if timeout < 0. && t.active_ops = 0 then ( + (* Nothing further can happen at this point. *) + Lf_queue.close t.run_q; (* Just to catch bugs if something tries to enqueue later *) + `Exit_scheduler + ) else ( + Atomic.set t.need_wakeup true; + if Lf_queue.is_empty t.run_q then ( + (* At this point we're not going to check [run_q] again before sleeping. + If [need_wakeup] is still [true], this is fine because we don't promise to do that. + If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) + Ctf.(note_hiatus Wait_for_work); + let cons _ fd acc = fd :: acc in + let read = FdMap.fold cons t.poll.to_read [] in + let write = FdMap.fold cons t.poll.to_write [] in + match Unix.select read write [] timeout with + | exception Unix.(Unix_error (EINTR, _, _)) -> next t + | readable, writeable, _ -> + Ctf.note_resume system_thread; + Atomic.set t.need_wakeup false; + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + List.iter (ready t [ `W ]) writeable; + List.iter (ready t [ `R ]) readable; + next t + ) else ( + (* Someone added a new job while we were setting [need_wakeup] to [true]. + They might or might not have seen that, so we can't be sure they'll send an event. *) + Atomic.set t.need_wakeup false; + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + next t + ) + ) + +let with_sched fn = + let run_q = Lf_queue.create () in + Lf_queue.push run_q IO; + let sleep_q = Zzz.create () in + (* Pipe's on Windows cannot be nonblocking through the OCaml API. *) + let eventfd_r, eventfd_w = Unix.socketpair ~cloexec:true Unix.PF_UNIX Unix.SOCK_STREAM 0 in + Unix.set_nonblock eventfd_r; + Unix.set_nonblock eventfd_w; + let eventfd = Rcfd.make eventfd_w in + let cleanup () = + Unix.close eventfd_r; + let was_open = Rcfd.close eventfd in + assert was_open + in + let poll = { to_read = FdMap.empty; to_write = FdMap.empty } in + let fd_map = Hashtbl.create 10 in + let t = { run_q; poll; poll_maxi = (-1); fd_map; eventfd; eventfd_r; + active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in + let eventfd_ri : int = Obj.magic eventfd_r in + t.poll.to_read <- FdMap.add eventfd_ri eventfd_r t.poll.to_read; + if eventfd_ri > t.poll_maxi then + t.poll_maxi <- eventfd_ri; + match fn t with + | x -> cleanup (); x + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + Printexc.raise_with_backtrace ex bt + +let await_readable t (k : unit Suspended.t) fd = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + t.active_ops <- t.active_ops + 1; + let waiters = get_waiters t fd in + let was_empty = Lwt_dllist.is_empty waiters.read in + let node = Lwt_dllist.add_l k waiters.read in + if was_empty then update t waiters fd; + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Lwt_dllist.remove node; + t.active_ops <- t.active_ops - 1; + enqueue_failed_thread t k ex + ); + next t + +let await_writable t (k : unit Suspended.t) fd = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + t.active_ops <- t.active_ops + 1; + let waiters = get_waiters t fd in + let was_empty = Lwt_dllist.is_empty waiters.write in + let node = Lwt_dllist.add_l k waiters.write in + if was_empty then update t waiters fd; + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Lwt_dllist.remove node; + t.active_ops <- t.active_ops - 1; + enqueue_failed_thread t k ex + ); + next t + +let get_enqueue t k = function + | Ok v -> enqueue_thread t k v + | Error ex -> enqueue_failed_thread t k ex + +let await_timeout t (k : unit Suspended.t) time = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + let node = Zzz.add t.sleep_q time k in + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Zzz.remove t.sleep_q node; + enqueue_failed_thread t k ex + ); + next t + +let with_op t fn x = + t.active_ops <- t.active_ops + 1; + match fn x with + | r -> + t.active_ops <- t.active_ops - 1; + r + | exception ex -> + t.active_ops <- t.active_ops - 1; + raise ex + +[@@@alert "-unstable"] + +type _ Effect.t += Enter : (t -> 'a Eio_utils.Suspended.t -> [`Exit_scheduler]) -> 'a Effect.t +let enter fn = Effect.perform (Enter fn) + +let run ~extra_effects t main x = + let rec fork ~new_fiber:fiber fn = + let open Effect.Deep in + Ctf.note_switch (Fiber_context.tid fiber); + match_with fn () + { retc = (fun () -> Fiber_context.destroy fiber; next t); + exnc = (fun ex -> + Fiber_context.destroy fiber; + Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) + ); + effc = fun (type a) (e : a Effect.t) -> + match e with + | Enter fn -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> fn t { Suspended.k; fiber } + ) + | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber) + | Eio.Private.Effects.Suspend f -> Some (fun k -> + let k = { Suspended.k; fiber } in + let enqueue = get_enqueue t k in + f fiber enqueue; + next t + ) + | Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k -> + let k = { Suspended.k; fiber } in + enqueue_at_head t k; + fork ~new_fiber f + ) + | Eio_unix.Private.Await_readable fd -> Some (fun k -> + await_readable t { Suspended.k; fiber } fd + ) + | Eio_unix.Private.Await_writable fd -> Some (fun k -> + await_writable t { Suspended.k; fiber } fd + ) + | e -> extra_effects.Effect.Deep.effc e + } + in + let result = ref None in + let `Exit_scheduler = + let new_fiber = Fiber_context.make_root () in + fork ~new_fiber (fun () -> + result := Some (with_op t main x); + ) + in + match !result with + | Some x -> x + | None -> failwith "BUG in scheduler: deadlock detected" diff --git a/lib_eio_windows/sched.mli b/lib_eio_windows/sched.mli new file mode 100755 index 000000000..01fc1cbcb --- /dev/null +++ b/lib_eio_windows/sched.mli @@ -0,0 +1,44 @@ +(** The scheduler keeps track of all suspended fibers and resumes them as appropriate. + + Each Eio domain has one scheduler, which keeps a queue of runnable + processes plus a record of all fibers waiting for IO operations to complete. *) + +type t + +type exit +(** This is equivalent to [unit], but indicates that a function returning this will call {!next} + and so does not return until the whole event loop is finished. Such functions should normally + be called in tail position. *) + +val with_sched : (t -> 'a) -> 'a +(** [with_sched fn] sets up a scheduler and calls [fn t]. + Typically [fn] will call {!run}. + When [fn] returns, the scheduler's resources are freed. *) + +val run : + extra_effects:exit Effect.Deep.effect_handler -> + t -> ('a -> 'b) -> 'a -> 'b [@@alert "-unstable"] +(** [run ~extra_effects t f x] starts an event loop using [t] and runs [f x] as the root fiber within it. + + Unknown effects are passed to [extra_effects]. *) + +val next : t -> exit +(** [next t] asks the scheduler to transfer control to the next runnable fiber, + or wait for an event from the OS if there is none. This should normally be + called in tail position from an effect handler. *) + +val await_readable : t -> unit Eio_utils.Suspended.t -> Unix.file_descr -> exit +(** [await_readable t k fd] arranges for [k] to be resumed when [fd] is ready for reading. *) + +val await_writable : t -> unit Eio_utils.Suspended.t -> Unix.file_descr -> exit +(** [await_readable t k fd] arranges for [k] to be resumed when [fd] is ready for writing. *) + +val await_timeout : t -> unit Eio_utils.Suspended.t -> Mtime.t -> exit +(** [await_timeout t k time] adds [time, k] to the timer. + + When [time] is reached, [k] is resumed. Cancelling [k] removes the entry from the timer. *) + +val enter : (t -> 'a Eio_utils.Suspended.t -> exit) -> 'a +(** [enter fn] suspends the current fiber and runs [fn t k] in the scheduler's context. + + [fn] should either resume [k] immediately itself, or call one of the [await_*] functions above. *) diff --git a/lib_eio_windows/test/dune b/lib_eio_windows/test/dune new file mode 100755 index 000000000..47626ef77 --- /dev/null +++ b/lib_eio_windows/test/dune @@ -0,0 +1,5 @@ +(test + (name test) + (package eio_windows) + (enabled_if (= %{os_type} "Win32")) + (libraries alcotest eio.mock eio_windows)) diff --git a/lib_eio_windows/test/test.ml b/lib_eio_windows/test/test.ml new file mode 100755 index 000000000..eac63960a --- /dev/null +++ b/lib_eio_windows/test/test.ml @@ -0,0 +1,67 @@ +module Timeout = struct + let test clock () = + let t0 = Unix.gettimeofday () in + Eio.Time.sleep clock 0.01; + let t1 = Unix.gettimeofday () in + let diff = t1 -. t0 in + if diff >= 0.01 then () else Alcotest.failf "Expected bigger difference than %f" diff + + + let tests env = [ + "timeout", `Quick, test env#clock + ] +end + +module Net = struct + open Eio.Std + + let read_all flow = + let b = Buffer.create 100 in + Eio.Flow.copy flow (Eio.Flow.buffer_sink b); + Buffer.contents b + + let run_client ~sw ~net ~addr = + traceln "Connecting to server..."; + let flow = Eio.Net.connect ~sw net addr in + Eio.traceln "connected"; + Eio.Flow.copy_string "Hello from client" flow; + Eio.Flow.shutdown flow `Send; + let msg = read_all flow in + msg + + let run_server ~sw msg socket = + Eio.Net.accept_fork socket ~sw (fun flow _addr -> + traceln "Server accepted connection from client"; + Fun.protect (fun () -> + let msg = read_all flow in + traceln "Server received: %S" msg + ) ~finally:(fun () -> Eio.Flow.copy_string msg flow) + ) + ~on_error:(function + | ex -> traceln "Error handling connection: %s" (Printexc.to_string ex) + ) + + let test_client_server env () = + Eio.Switch.run @@ fun sw -> + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8081) in + let server = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:5 addr in + let msg = "From the server" in + Fiber.both + (fun () -> run_server ~sw msg server) + (fun () -> + let client_msg = run_client ~sw ~net:env#net ~addr in + Alcotest.(check string) "same message" msg client_msg + ) + + + let tests env = [ + "server-client", `Quick, test_client_server env + ] +end + +let () = + Eio_windows.run @@ fun env -> + Alcotest.run "eio_windows" [ + "net", Net.tests env; + "timeout", Timeout.tests env + ] \ No newline at end of file diff --git a/lib_eio_windows/time.ml b/lib_eio_windows/time.ml new file mode 100755 index 000000000..b11aaffa3 --- /dev/null +++ b/lib_eio_windows/time.ml @@ -0,0 +1,19 @@ +let mono_clock = object + inherit Eio.Time.Mono.t + + method now = Mtime_clock.now () + + method sleep_until = Low_level.sleep_until +end + +let clock = object + inherit Eio.Time.clock + + method now = Unix.gettimeofday () + + method sleep_until time = + (* todo: use the realtime clock directly instead of converting to monotonic time. + That is needed to handle adjustments to the system clock correctly. *) + let d = time -. Unix.gettimeofday () in + Eio.Time.Mono.sleep mono_clock d +end diff --git a/tests/dune b/tests/dune index a4836c50f..31aec547a 100644 --- a/tests/dune +++ b/tests/dune @@ -1,5 +1,6 @@ (mdx (package eio_main) + (enabled_if (<> %{os_type} "Win32")) (deps (env_var "EIO_BACKEND") (package eio_main))) From cf6ba07b45d3caeefc6a737870efb5beadd1b245 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Apr 2023 10:48:36 +0100 Subject: [PATCH 02/18] Remove unused process module --- lib_eio_windows/low_level.ml | 63 ----------------------------------- lib_eio_windows/low_level.mli | 27 --------------- 2 files changed, 90 deletions(-) diff --git a/lib_eio_windows/low_level.ml b/lib_eio_windows/low_level.ml index 73714c36c..b70632fe8 100755 --- a/lib_eio_windows/low_level.ml +++ b/lib_eio_windows/low_level.ml @@ -217,66 +217,3 @@ let pipe ~sw = Unix.set_nonblock unix_r; Unix.set_nonblock unix_w; r, w - -module Process = struct - type t = { - pid : int; - exit_status : Unix.process_status Promise.t; - } - - let exit_status t = t.exit_status - let pid t = t.pid - - module Fork_action = Eio_unix.Private.Fork_action - - (* Read a (typically short) error message from a child process. *) - let rec read_response fd = - let buf = Bytes.create 256 in - match read fd buf 0 (Bytes.length buf) with - | 0 -> "" - | n -> Bytes.sub_string buf 0 n ^ read_response fd - - let with_pipe fn = - Switch.run @@ fun sw -> - let r, w = pipe ~sw in - fn r w - - let signal t signal = - (* The lock here ensures we don't signal the PID after reaping it. *) - Children.with_lock @@ fun () -> - if not (Promise.is_resolved t.exit_status) then ( - Unix.kill t.pid signal - ) - - external eio_spawn : Unix.file_descr -> Eio_unix.Private.Fork_action.c_action list -> int = "caml_eio_windows_spawn" - - let spawn ~sw actions = - with_pipe @@ fun errors_r errors_w -> - Eio_unix.Private.Fork_action.with_actions actions @@ fun c_actions -> - Switch.check sw; - let t = - (* We take the lock to ensure that the signal handler won't reap the - process before we've registered it. *) - Children.with_lock (fun () -> - let pid = - Fd.use_exn "errors-w" errors_w @@ fun errors_w -> - eio_spawn errors_w c_actions - in - Fd.close errors_w; - { pid; exit_status = Children.register pid } - ) - in - let hook = Switch.on_release_cancellable sw (fun () -> signal t Sys.sigkill) in - (* Removing the hook must be done from our own domain, not from the signal handler, - so fork a fiber to deal with that. If the switch gets cancelled then this won't - run, but then the [on_release] handler will run the hook soon anyway. *) - Fiber.fork_daemon ~sw (fun () -> - ignore (Promise.await t.exit_status : Unix.process_status); - Switch.remove_hook hook; - `Stop_daemon - ); - (* Check for errors starting the process. *) - match read_response errors_r with - | "" -> t (* Success! Execing the child closed [errors_w] and we got EOF. *) - | err -> failwith err -end diff --git a/lib_eio_windows/low_level.mli b/lib_eio_windows/low_level.mli index 967515e1a..267960f69 100755 --- a/lib_eio_windows/low_level.mli +++ b/lib_eio_windows/low_level.mli @@ -73,30 +73,3 @@ end val openat : ?dirfd:fd -> sw:Switch.t -> mode:int -> string -> Open_flags.t -> fd (** Note: the returned FD is always non-blocking and close-on-exec. *) - -module Process : sig - type t - (** A child process. *) - - module Fork_action = Eio_unix.Private.Fork_action - (** Setup actions to perform in the child process. *) - - val spawn : sw:Switch.t -> Fork_action.t list -> t - (** [spawn ~sw actions] forks a child process, which executes [actions]. - The last action should be {!Fork_action.execve}. - - You will typically want to do [Promise.await (exit_status child)] after this. - - @param sw The child will be sent {!Sys.sigkill} if [sw] finishes. *) - - val signal : t -> int -> unit - (** [signal t x] sends signal [x] to [t]. - - This is similar to doing [Unix.kill t.pid x], - except that it ensures no signal is sent after [t] has been reaped. *) - - val pid : t -> int - - val exit_status : t -> Unix.process_status Promise.t - (** [exit_status t] is a promise for the process's exit status. *) -end From e6092958a7cbc592d77f665a92b76fc597660661 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Apr 2023 10:51:26 +0100 Subject: [PATCH 03/18] Remove deprecated FD module --- lib_eio_windows/fd.ml | 16 ------------ lib_eio_windows/fd.mli | 49 ----------------------------------- lib_eio_windows/low_level.ml | 4 +-- lib_eio_windows/low_level.mli | 3 --- 4 files changed, 2 insertions(+), 70 deletions(-) delete mode 100755 lib_eio_windows/fd.ml delete mode 100755 lib_eio_windows/fd.mli diff --git a/lib_eio_windows/fd.ml b/lib_eio_windows/fd.ml deleted file mode 100755 index fec405e81..000000000 --- a/lib_eio_windows/fd.ml +++ /dev/null @@ -1,16 +0,0 @@ -(* Deprecated *) - -include Eio_unix.Fd - -let compare = Stdlib.compare - -type has_fd = Eio_unix.Resource.t - -let to_unix op t = - match op with - | `Take -> remove t |> Option.get - | `Peek -> use_exn "to_unix" t Fun.id - -let of_unix ~sw ~blocking ~close_unix fd = of_unix ~sw ~blocking ~close_unix fd - -let get_fd_opt = Eio_unix.Resource.fd_opt diff --git a/lib_eio_windows/fd.mli b/lib_eio_windows/fd.mli deleted file mode 100755 index d58bf2ca4..000000000 --- a/lib_eio_windows/fd.mli +++ /dev/null @@ -1,49 +0,0 @@ -(** A safe wrapper for {!Unix.file_descr}. *) - -open Eio.Std - -type t = Eio_unix.Fd.t -(** A wrapper around a {!Unix.file_descr}. *) - -val compare : t -> t -> int - -val of_unix : sw:Switch.t -> blocking:bool -> close_unix:bool -> Unix.file_descr -> t -(** [of_unix ~sw ~blocking ~close_unix fd] wraps [fd]. - - @param sw Close [fd] automatically when [sw] is finished. - @param blocking Indicates whether [fd] is in blocking mode. - Normally you should call [Unix.set_nonblock fd] first and pass [false] here. - @param close_unix Whether {!close} also closes [fd] (this should normally be [true]). *) - -val use_exn : string -> t -> (Unix.file_descr -> 'a) -> 'a -(** [use_exn op t fn] calls [fn wrapped_fd], ensuring that [wrapped_fd] will not be closed - before [fn] returns. - - If [t] is already closed, it raises an exception, using [op] as the name of the failing operation. *) - -val close : t -> unit -(** [close t] marks [t] as closed, so that {!use_exn} can no longer be used to start new operations. - - The wrapped FD will be closed once all current users of the FD have finished (unless [close_unix = false]). - - @raise Invalid_argument if [t] is closed by another fiber first. *) - -val is_blocking : t -> bool -(** [is_blocking t] returns the value of [blocking] passed to {!of_unix}. *) - -val stdin : t -val stdout : t -val stderr : t - -val to_unix : [`Peek | `Take] -> t -> Unix.file_descr -(** [to_unix `Take t] closes [t] without closing the wrapped FD, which it returns to the caller once all operations on it have finished. - - [to_unix `Peek t] returns the wrapped FD directly. You must ensure that it is not closed while using it. *) - -type has_fd = < fd : t > -(** Resources that have FDs are sub-types of [has_fd]. *) - -val get_fd_opt : #Eio.Generic.t -> t option -(** [get_fd_opt r] returns the [t] being wrapped by a resource, if any. - - This just probes [r] using {!FD}. *) diff --git a/lib_eio_windows/low_level.ml b/lib_eio_windows/low_level.ml index b70632fe8..7e35d7b88 100755 --- a/lib_eio_windows/low_level.ml +++ b/lib_eio_windows/low_level.ml @@ -11,11 +11,11 @@ open Eio.Std type ty = Read | Write -module Fd = Fd - (* todo: keeping a pool of workers is probably faster *) let in_worker_thread = Eio_unix.run_in_systhread +module Fd = Eio_unix.Fd + let await_readable fd = Fd.use_exn "await_readable" fd @@ fun fd -> Sched.enter @@ fun t k -> diff --git a/lib_eio_windows/low_level.mli b/lib_eio_windows/low_level.mli index 267960f69..8765bc28d 100755 --- a/lib_eio_windows/low_level.mli +++ b/lib_eio_windows/low_level.mli @@ -14,9 +14,6 @@ open Eio.Std type fd := Eio_unix.Fd.t -module Fd = Fd -[@@deprecated "Use Eio_unix.Fd instead"] - val await_readable : fd -> unit val await_writable : fd -> unit From a9e2943a92cf5b419b4f622a9b71be1543ee2268 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Apr 2023 10:53:59 +0100 Subject: [PATCH 04/18] Update comments --- lib_eio_windows/eio_windows.ml | 4 ---- lib_eio_windows/eio_windows.mli | 6 +++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/lib_eio_windows/eio_windows.ml b/lib_eio_windows/eio_windows.ml index 0e5cb59f5..0e5ac6f18 100755 --- a/lib_eio_windows/eio_windows.ml +++ b/lib_eio_windows/eio_windows.ml @@ -31,10 +31,6 @@ type stdenv = < > let run main = - (* SIGPIPE makes no sense in a modern application. *) - (* These don't work on Windows. *) - (* Sys.(set_signal sigpipe Signal_ignore); *) - (* Sys.(set_signal sigchld (Signal_handle (fun (_:int) -> Children.handle_sigchld ()))); *) let stdin = (Flow.of_fd Eio_unix.Fd.stdin :> ) in let stdout = (Flow.of_fd Eio_unix.Fd.stdout :> ) in let stderr = (Flow.of_fd Eio_unix.Fd.stderr :> ) in diff --git a/lib_eio_windows/eio_windows.mli b/lib_eio_windows/eio_windows.mli index 259970bbc..300c97b61 100755 --- a/lib_eio_windows/eio_windows.mli +++ b/lib_eio_windows/eio_windows.mli @@ -1,4 +1,4 @@ -(** Fallback Eio backend for POSIX systems. *) +(** Fallback Eio backend for Windows using OCaml's [Unix.select]. *) type stdenv = < stdin : ; @@ -13,7 +13,7 @@ type stdenv = < secure_random : Eio.Flow.source; debug : Eio.Debug.t; > -(** An extended version of {!Eio.Stdenv.t} with some extra features available on POSIX systems. *) +(** An extended version of {!Eio.Stdenv.t} with some extra features available on Windows. *) val run : (stdenv -> 'a) -> 'a (** [run main] runs an event loop and calls [main stdenv] inside it. @@ -21,4 +21,4 @@ val run : (stdenv -> 'a) -> 'a For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *) module Low_level = Low_level -(** Low-level API for making POSIX calls directly. *) +(** Low-level API. *) From 4bd46b5e84fabf0cae0350d22ab3e178617540eb Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Apr 2023 13:58:59 +0100 Subject: [PATCH 05/18] Update CI --- .github/workflows/main.yml | 10 ++++++++-- eio_windows.opam | 4 ++++ eio_windows.opam.template | 4 ++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cda6bc432..6244b0821 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -47,13 +47,19 @@ jobs: ocaml-compiler: ocaml.5.0.0,ocaml-option-mingw opam-repositories: | dra27: https://github.com/dra27/opam-repository.git#windows-5.0 + duniverse: git+https://github.com/dune-universe/opam-overlays normal: https://github.com/ocaml/opam-repository.git default: https://github.com/ocaml-opam/opam-repository-mingw.git#opam2 - - run: opam pin -yn . && opam install eio_windows --deps-only --with-test + # --with-version=dev is not available, and --with-test also tries running tests for packages (like MDX) which fail... + - run: | + opam pin -yn eio.dev . + opam pin -yn eio_windows.dev . + opam install ocamlfind.1.9.5 alcotest mdx crowbar -y + opam install eio eio_windows --deps-only - run: opam exec -- dune runtest docker: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Build the Docker image - run: docker build . + run: docker build . \ No newline at end of file diff --git a/eio_windows.opam b/eio_windows.opam index 9ff9f87bb..8676ffdbd 100755 --- a/eio_windows.opam +++ b/eio_windows.opam @@ -31,5 +31,9 @@ build: [ ] dev-repo: "git+https://github.com/ocaml-multicore/eio.git" pin-depends: [ + # Removes base bytes for crowbar + [ "ocplib-endian.dev" "git+https://github.com/Leonidas-from-XIV/ocplib-endian#fda4d5525063c8444020be369c63de23d39c246e" ] + # Needed for the cstruct read and writes without copying + [ "cstruct.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] [ "cstruct-unix.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] ] \ No newline at end of file diff --git a/eio_windows.opam.template b/eio_windows.opam.template index 5ffe99e8b..7f38c1a6a 100755 --- a/eio_windows.opam.template +++ b/eio_windows.opam.template @@ -1,3 +1,7 @@ pin-depends: [ + # Removes base bytes for crowbar + [ "ocplib-endian.dev" "git+https://github.com/Leonidas-from-XIV/ocplib-endian#fda4d5525063c8444020be369c63de23d39c246e" ] + # Needed for the cstruct read and writes without copying + [ "cstruct.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] [ "cstruct-unix.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] ] \ No newline at end of file From ec54ccb9e7f0edf31684693bb88c83afe2eb90b8 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Apr 2023 14:00:29 +0100 Subject: [PATCH 06/18] More posix to windows clean up --- lib_eio_windows/children.ml | 88 ------------------- lib_eio_windows/children.mli | 14 --- lib_eio_windows/dune | 3 +- ...{eio_posix_stubs.c => eio_windows_stubs.c} | 0 4 files changed, 1 insertion(+), 104 deletions(-) delete mode 100755 lib_eio_windows/children.ml delete mode 100755 lib_eio_windows/children.mli rename lib_eio_windows/{eio_posix_stubs.c => eio_windows_stubs.c} (100%) diff --git a/lib_eio_windows/children.ml b/lib_eio_windows/children.ml deleted file mode 100755 index d5a12d139..000000000 --- a/lib_eio_windows/children.ml +++ /dev/null @@ -1,88 +0,0 @@ -(* Keep track of running child processes and notify their fiber when they exit. - After forking a child process, it gets registered in the global [db] along with a resolver - for the promise of its exit status. When we get a SIGCHLD signal, we reap all exited processes - and resolve their promises, waking whichever fibers are waiting for them. - - We have to be careful not to use a PID after [wait] reaps it, as the PID could have been reused by then. - - The signal handler can run in any domain or systhread, so we have to be careful about that too. - We can't defer the call to [wait] until we're running in an Eio domain as we don't know which domain - should handle it until [wait] gives as the process ID. We don't want to delegate to a particular domain - because it might be spinning doing CPU stuff for a long time. Instead, we try to take the lock in the - signal handler and do it there. If we can't get the lock then we just record that a wait is needed; - whoever holds the lock will soon release it and will do the reaping for us. - - Note that, since signal handlers are global, - this will interfere with any libraries trying to manage processes themselves. - - For systems with Process Descriptors we could skip all this nonsense and - just poll on the process's FD. e.g. using [pdfork] on FreeBSD or [CLONE_PIDFD] on Linux. *) - -open Eio.Std - -(* Each child process is registered in this table. - Must hold [lock] when accessing it. *) -let db : (int, Unix.process_status Promise.u) Hashtbl.t = Hashtbl.create 10 - -(* Set to [true] when we receive [SIGCHLD] and [false] before calling [wait]. *) -let need_wait = Atomic.make false - -(* [lock] must be held when spawning or reaping. Otherwise, this can happen: - - - We spawn process 100, adding it to [db]. - - It exits, sending us SIGCHLD. - - The signal handler calls [wait], reaping it. - - Another domain spawns another process 100 and adds it to [db], - overwriting the previous entry. - - The signal handler resumes, and gets the wrong entry. - - If [lock] is already locked when the SIGCHLD handler runs then it just leaves [need_wait = true] - (a signal handler can't wait on a mutex, since it may have interrupted the holder). - The unlocker needs to check [need_wait] after releasing the lock. *) -let lock = Mutex.create () - -(* [pid] has exited. Notify the waiter. Must hold [lock] when calling this. *) -let report_child_status pid status = - match Hashtbl.find_opt db pid with - | Some r -> - Hashtbl.remove db pid; - Promise.resolve r status - | None -> - (* Not one of ours. Not much we can do here. The spawner will probably get - an [ECHILD] error when they wait, which will do for the error. *) - () - -(* Must hold [lock] when calling this. *) -let rec reap () = - Atomic.set need_wait false; - match Unix.(waitpid [WNOHANG] (-1)) with - | 0, _ -> () (* Returned if there are children but none has exited yet. *) - | pid, status -> report_child_status pid status; reap () - | exception Unix.Unix_error (EINTR, _, _) -> reap () - | exception Unix.Unix_error (ECHILD, _, _) -> () (* Returned if there are no children at all. *) - -let rec reap_nonblocking () = - if Mutex.try_lock lock then ( - reap (); - Mutex.unlock lock; - if Atomic.get need_wait then reap_nonblocking () - ) (* else the unlocker will see [need_wait] and call us later *) - -let unlock () = - Mutex.unlock lock; - if Atomic.get need_wait then reap_nonblocking () - -(* Must hold [lock] when calling this. *) -let register pid = - assert (not (Hashtbl.mem db pid)); - let p, r = Promise.create () in - Hashtbl.add db pid r; - p - -let with_lock fn = - Mutex.lock lock; - Fun.protect fn ~finally:unlock - -let handle_sigchld () = - Atomic.set need_wait true; - reap_nonblocking () diff --git a/lib_eio_windows/children.mli b/lib_eio_windows/children.mli deleted file mode 100755 index a8ead2276..000000000 --- a/lib_eio_windows/children.mli +++ /dev/null @@ -1,14 +0,0 @@ -(** Keep track of child processes and respond to SIGCHLD. *) - -val with_lock : (unit -> 'a) -> 'a -(** This must be held during the (fork, register) sequence - (so that we don't try to reap the process before it's registered), - and also when signalling a child process - (to ensure it isn't reaped at the same time). *) - -val register : int -> Unix.process_status Eio.Promise.t -(** [register pid] adds [pid] to the list of children and returns a promise for its exit status. - You must hold the lock while forking and then calling this. *) - -val handle_sigchld : unit -> unit -(** Call this on [SIGCHLD]. *) diff --git a/lib_eio_windows/dune b/lib_eio_windows/dune index aee7582f1..ab9a1e166 100644 --- a/lib_eio_windows/dune +++ b/lib_eio_windows/dune @@ -5,9 +5,8 @@ (enabled_if (= %{os_type} "Win32")) (foreign_stubs (language c) - (flags :standard -D_LARGEFILE64_SOURCE) (include_dirs ../lib_eio/unix/include) - (names eio_posix_stubs)) + (names eio_windows_stubs)) (libraries eio eio.unix eio.utils fmt cstruct-unix)) (rule diff --git a/lib_eio_windows/eio_posix_stubs.c b/lib_eio_windows/eio_windows_stubs.c similarity index 100% rename from lib_eio_windows/eio_posix_stubs.c rename to lib_eio_windows/eio_windows_stubs.c From 40d758b3626bfc1020ed3e2046381798be7882f7 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 29 Apr 2023 18:07:30 +0100 Subject: [PATCH 07/18] Remove incorrect Obj.magic --- lib_eio_windows/sched.ml | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml index f06d986d3..87580f26e 100755 --- a/lib_eio_windows/sched.ml +++ b/lib_eio_windows/sched.ml @@ -37,12 +37,17 @@ type fd_event_waiters = { write : unit Suspended.t Lwt_dllist.t; } -module FdMap = Map.Make (Int) +module FdCompare = struct + type t = Unix.file_descr + let compare = Stdlib.compare +end + +module FdSet = Set.Make (FdCompare) (* A structure for storing the file descriptors for select. *) type poll = { - mutable to_read : Unix.file_descr FdMap.t; - mutable to_write : Unix.file_descr FdMap.t; + mutable to_read : FdSet.t; + mutable to_write : FdSet.t; } type t = { @@ -50,7 +55,6 @@ type t = { run_q : runnable Lf_queue.t; poll : poll; - mutable poll_maxi : int; (* The highest index ever used in [poll]. *) fd_map : (Unix.file_descr, fd_event_waiters) Hashtbl.t; (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. @@ -114,7 +118,6 @@ let clear_event_fd t = (* Update [t.poll]'s entry for [fd] to match [waiters]. *) let update t waiters fd = - let fdi : int = Obj.magic fd in let flags = match not (Lwt_dllist.is_empty waiters.read), not (Lwt_dllist.is_empty waiters.write) with @@ -125,15 +128,15 @@ let update t waiters fd = in match flags with | `Empty -> ( - t.poll.to_read <- FdMap.remove fdi t.poll.to_read; - t.poll.to_write <- FdMap.remove fdi t.poll.to_write; + t.poll.to_read <- FdSet.remove fd t.poll.to_read; + t.poll.to_write <- FdSet.remove fd t.poll.to_write; Hashtbl.remove t.fd_map fd ) - | `R -> t.poll.to_read <- FdMap.add fdi fd t.poll.to_read - | `W -> t.poll.to_write <- FdMap.add fdi fd t.poll.to_write + | `R -> t.poll.to_read <- FdSet.add fd t.poll.to_read + | `W -> t.poll.to_write <- FdSet.add fd t.poll.to_write | `RW -> - t.poll.to_read <- FdMap.add fdi fd t.poll.to_read; - t.poll.to_write <- FdMap.add fdi fd t.poll.to_write + t.poll.to_read <- FdSet.add fd t.poll.to_read; + t.poll.to_write <- FdSet.add fd t.poll.to_write let resume t node = t.active_ops <- t.active_ops - 1; @@ -202,9 +205,9 @@ let rec next t : [`Exit_scheduler] = If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) Ctf.(note_hiatus Wait_for_work); - let cons _ fd acc = fd :: acc in - let read = FdMap.fold cons t.poll.to_read [] in - let write = FdMap.fold cons t.poll.to_write [] in + let cons fd acc = fd :: acc in + let read = FdSet.fold cons t.poll.to_read [] in + let write = FdSet.fold cons t.poll.to_write [] in match Unix.select read write [] timeout with | exception Unix.(Unix_error (EINTR, _, _)) -> next t | readable, writeable, _ -> @@ -237,14 +240,11 @@ let with_sched fn = let was_open = Rcfd.close eventfd in assert was_open in - let poll = { to_read = FdMap.empty; to_write = FdMap.empty } in + let poll = { to_read = FdSet.empty; to_write = FdSet.empty } in let fd_map = Hashtbl.create 10 in - let t = { run_q; poll; poll_maxi = (-1); fd_map; eventfd; eventfd_r; + let t = { run_q; poll; fd_map; eventfd; eventfd_r; active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in - let eventfd_ri : int = Obj.magic eventfd_r in - t.poll.to_read <- FdMap.add eventfd_ri eventfd_r t.poll.to_read; - if eventfd_ri > t.poll_maxi then - t.poll_maxi <- eventfd_ri; + t.poll.to_read <- FdSet.add eventfd_r t.poll.to_read; match fn t with | x -> cleanup (); x | exception ex -> From 0917e26e3e98ec13252492ea91312c9eb72ecee7 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Tue, 2 May 2023 14:12:10 +0100 Subject: [PATCH 08/18] Update comments and stubs --- lib_eio/unix/fork/dune | 4 ---- lib_eio/unix/fork_action.c | 6 +++--- lib_eio_windows/eio_windows_stubs.c | 10 +++++----- lib_eio_windows/sched.ml | 2 +- 4 files changed, 9 insertions(+), 13 deletions(-) delete mode 100755 lib_eio/unix/fork/dune diff --git a/lib_eio/unix/fork/dune b/lib_eio/unix/fork/dune deleted file mode 100755 index dd2f8f134..000000000 --- a/lib_eio/unix/fork/dune +++ /dev/null @@ -1,4 +0,0 @@ -(library - (name eio_fork_actions) - (public_name eio.fork_actions) - (libraries )) \ No newline at end of file diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index 99c87ff7e..8ed452387 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -9,10 +9,10 @@ #include "fork_action.h" +#ifdef _WIN32 +#else void eio_unix_run_fork_actions(int errors, value v_actions) { - #ifdef _WIN32 uerror("Unsupported operation on windows", Nothing); - #else int old_flags = fcntl(errors, F_GETFL, 0); fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK); while (Is_block(v_actions)) { @@ -22,8 +22,8 @@ void eio_unix_run_fork_actions(int errors, value v_actions) { v_actions = Field(v_actions, 1); } _exit(1); - #endif } +#endif static void try_write_all(int fd, char *buf) { int len = strlen(buf); diff --git a/lib_eio_windows/eio_windows_stubs.c b/lib_eio_windows/eio_windows_stubs.c index fd91b3fae..6120788b9 100755 --- a/lib_eio_windows/eio_windows_stubs.c +++ b/lib_eio_windows/eio_windows_stubs.c @@ -51,22 +51,22 @@ CAMLprim value caml_eio_windows_getrandom(value v_ba, value v_off, value v_len) CAMLprim value caml_eio_windows_readv(value v_fd, value v_bufs) { - uerror("Readv is not supported on windows yet", Nothing); + uerror("readv is not supported on windows yet", Nothing); } CAMLprim value caml_eio_windows_preadv(value v_fd, value v_bufs, value v_offset) { - uerror("Preadv is not supported on windows yet", Nothing); + uerror("preadv is not supported on windows yet", Nothing); } CAMLprim value caml_eio_windows_pwritev(value v_fd, value v_bufs, value v_offset) { - uerror("Pwritev is not supported on windows yet", Nothing); + uerror("pwritev is not supported on windows yet", Nothing); } CAMLprim value caml_eio_windows_openat(value v_dirfd, value v_pathname, value v_flags, value v_mode) { - uerror("Readv is not supported on windows yet", Nothing); + uerror("openat is not supported on windows yet", Nothing); } CAMLprim value caml_eio_windows_mkdirat(value v_fd, value v_path, value v_perm) @@ -86,5 +86,5 @@ CAMLprim value caml_eio_windows_renameat(value v_old_fd, value v_old_path, value CAMLprim value caml_eio_windows_spawn(value v_errors, value v_actions) { - uerror("Processes are not supported on windows yet", Nothing); + uerror("processes are not supported on windows yet", Nothing); } diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml index 87580f26e..1f4721206 100755 --- a/lib_eio_windows/sched.ml +++ b/lib_eio_windows/sched.ml @@ -230,7 +230,7 @@ let with_sched fn = let run_q = Lf_queue.create () in Lf_queue.push run_q IO; let sleep_q = Zzz.create () in - (* Pipe's on Windows cannot be nonblocking through the OCaml API. *) + (* Pipes on Windows cannot be nonblocking through the OCaml API. *) let eventfd_r, eventfd_w = Unix.socketpair ~cloexec:true Unix.PF_UNIX Unix.SOCK_STREAM 0 in Unix.set_nonblock eventfd_r; Unix.set_nonblock eventfd_w; From 7fe1080e94bfe2f930358e1ad80dba330ab2ee02 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Tue, 2 May 2023 14:14:53 +0100 Subject: [PATCH 09/18] Remove Fs from windows --- lib_eio_windows/eio_windows.ml | 4 +- lib_eio_windows/fs.ml | 184 --------------------------------- 2 files changed, 2 insertions(+), 186 deletions(-) delete mode 100755 lib_eio_windows/fs.ml diff --git a/lib_eio_windows/eio_windows.ml b/lib_eio_windows/eio_windows.ml index 0e5ac6f18..c69d00008 100755 --- a/lib_eio_windows/eio_windows.ml +++ b/lib_eio_windows/eio_windows.ml @@ -43,7 +43,7 @@ let run main = method mono_clock = Time.mono_clock method net = Net.v method domain_mgr = Domain_mgr.v - method cwd = ((Fs.cwd, "") :> Eio.Fs.dir Eio.Path.t) - method fs = ((Fs.fs, "") :> Eio.Fs.dir Eio.Path.t) + method cwd = failwith "file-system operations not supported on Windows yet" + method fs = failwith "file-system operations not supported on Windows yet" method secure_random = Flow.secure_random end diff --git a/lib_eio_windows/fs.ml b/lib_eio_windows/fs.ml deleted file mode 100755 index 7afeba2f7..000000000 --- a/lib_eio_windows/fs.ml +++ /dev/null @@ -1,184 +0,0 @@ -(* - * Copyright (C) 2023 Thomas Leonard - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -(* This module provides (optional) sandboxing, allowing operations to be restricted to a subtree. - - For now, sandboxed directories use realpath and [O_NOFOLLOW], which is probably quite slow, - and requires duplicating a load of path lookup logic from the kernel. - It might be better to hold a directory FD rather than a path. - On FreeBSD we could use O_RESOLVE_BENEATH and let the OS handle everything for us. - On other systems we would have to resolve one path component at a time. *) - -open Eio.Std - -module Fd = Eio_unix.Fd - -class virtual posix_dir = object - inherit Eio.Fs.dir - - val virtual opt_nofollow : Low_level.Open_flags.t - (** Extra flags for open operations. Sandboxes will add [O_NOFOLLOW] here. *) - - method virtual private resolve : string -> string - (** [resolve path] returns the real path that should be used to access [path]. - For sandboxes, this is [realpath path] (and it checks that it is within the sandbox). - For unrestricted access, this is the identity function. *) - - method virtual with_parent_dir : 'a. (string -> (Fd.t option -> string -> 'a) -> 'a) - (** [with_parent_dir path fn] runs [fn dir_fd rel_path], - where [rel_path] accessed relative to [dir_fd] gives access to [path]. - For unrestricted access, this just runs [fn None path]. - For sandboxes, it opens the parent of [path] as [dir_fd] and runs [fn (Some dir_fd) (basename path)]. *) -end - -(* When renaming, we get a plain [Eio.Fs.dir]. We need extra access to check - that the new location is within its sandbox. *) -type _ Eio.Generic.ty += Posix_dir : posix_dir Eio.Generic.ty -let as_posix_dir x = Eio.Generic.probe x Posix_dir - -class virtual dir ~label = object (self) - inherit posix_dir - - val mutable closed = false - - method! probe : type a. a Eio.Generic.ty -> a option = function - | Posix_dir -> Some (self :> posix_dir) - | _ -> None - - method open_in ~sw path = - let fd = Err.run (Low_level.openat ~mode:0 ~sw (self#resolve path)) Low_level.Open_flags.(opt_nofollow + rdonly) in - (Flow.of_fd fd :> ) - - method open_out ~sw ~append ~create path = - let mode, flags = - match create with - | `Never -> 0, Low_level.Open_flags.empty - | `If_missing perm -> perm, Low_level.Open_flags.creat - | `Or_truncate perm -> perm, Low_level.Open_flags.(creat + trunc) - | `Exclusive perm -> perm, Low_level.Open_flags.(creat + excl) - in - let flags = if append then Low_level.Open_flags.(flags + append) else flags in - let flags = Low_level.Open_flags.(flags + rdwr + opt_nofollow) in - match - self#with_parent_dir path @@ fun dirfd path -> - Low_level.openat ?dirfd ~sw ~mode path flags - with - | fd -> (Flow.of_fd fd :> ) - | exception Unix.Unix_error (ELOOP, _, _) -> - (* The leaf was a symlink (or we're unconfined and the main path changed, but ignore that). - A leaf symlink might be OK, but we need to check it's still in the sandbox. - todo: possibly we should limit the number of redirections here, like the kernel does. *) - let target = Unix.readlink path in - let full_target = - if Filename.is_relative target then - Filename.concat (Filename.dirname path) target - else target - in - self#open_out ~sw ~append ~create full_target - | exception Unix.Unix_error (code, name, arg) -> - raise (Err.wrap code name arg) - - method mkdir ~perm path = - self#with_parent_dir path @@ fun dirfd path -> - Err.run (Low_level.mkdir ?dirfd ~mode:perm) path - - method unlink path = - self#with_parent_dir path @@ fun dirfd path -> - Err.run (Low_level.unlink ?dirfd ~dir:false) path - - method rmdir path = - self#with_parent_dir path @@ fun dirfd path -> - Err.run (Low_level.unlink ?dirfd ~dir:true) path - - method read_dir path = - (* todo: need fdopendir here to avoid races *) - let path = self#resolve path in - Err.run Low_level.readdir path - |> Array.to_list - - method rename old_path new_dir new_path = - match as_posix_dir new_dir with - | None -> invalid_arg "Target is not an eio_posix directory!" - | Some new_dir -> - self#with_parent_dir old_path @@ fun old_dir old_path -> - new_dir#with_parent_dir new_path @@ fun new_dir new_path -> - Err.run (Low_level.rename ?old_dir old_path ?new_dir) new_path - - method open_dir ~sw path = - Switch.check sw; - let label = Filename.basename path in - let d = new sandbox ~label (self#resolve path) in - Switch.on_release sw (fun () -> d#close); - (d :> Eio.Fs.dir_with_close) - - method close = closed <- true - - method pp f = Fmt.string f (String.escaped label) -end - -and sandbox ~label dir_path = object (self) - inherit dir ~label - - val opt_nofollow = Low_level.Open_flags.empty - (* TODO: Low_level.Open_flags.nofollow *) - - (* Resolve a relative path to an absolute one, with no symlinks. - @raise Eio.Fs.Permission_denied if it's outside of [dir_path]. *) - method private resolve path = - if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path; - if Filename.is_relative path then ( - let dir_path = Err.run Low_level.realpath dir_path in - let full = Err.run Low_level.realpath (Filename.concat dir_path path) in - let prefix_len = String.length dir_path + 1 in - if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then - full - else if full = dir_path then - full - else - raise @@ Eio.Fs.err (Permission_denied (Err.Outside_sandbox (full, dir_path))) - ) else ( - raise @@ Eio.Fs.err (Permission_denied Err.Absolute_path) - ) - - method with_parent_dir path fn = - if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path; - let dir, leaf = Filename.dirname path, Filename.basename path in - if leaf = ".." then ( - (* We could be smarter here and normalise the path first, but '..' - doesn't make sense for any of the current uses of [with_parent_dir] - anyway. *) - raise (Eio.Fs.err (Permission_denied (Err.Invalid_leaf leaf))) - ) else ( - let dir = self#resolve dir in - Switch.run @@ fun sw -> - (* TODO: Directory and no-follow needed *) - let dirfd = Low_level.openat ~sw ~mode:0 dir Low_level.Open_flags.rdonly in - fn (Some dirfd) leaf - ) -end - -(* Full access to the filesystem. *) -let fs = object - inherit dir ~label:"fs" - - val opt_nofollow = Low_level.Open_flags.empty - - (* No checks *) - method private resolve path = path - method private with_parent_dir path fn = fn None path -end - -let cwd = new sandbox ~label:"cwd" "." From 46105436d263c94f8736ccfff4248c10226f196b Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Tue, 2 May 2023 14:23:10 +0100 Subject: [PATCH 10/18] Add tuareg condition to windows test --- lib_eio_windows/test/dune | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib_eio_windows/test/dune b/lib_eio_windows/test/dune index 47626ef77..22eb7d91d 100755 --- a/lib_eio_windows/test/dune +++ b/lib_eio_windows/test/dune @@ -1,5 +1,13 @@ +(* -*- tuareg -*- *) + +let win32 = List.mem ("os_type", "Win32") Jbuild_plugin.V1.ocamlc_config + +let () = Jbuild_plugin.V1.send @@ if not win32 then "" else {| + (test (name test) (package eio_windows) (enabled_if (= %{os_type} "Win32")) (libraries alcotest eio.mock eio_windows)) + +|} \ No newline at end of file From 106ea8a797e5f6129ee9cfcd31949a66239a9827 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Tue, 2 May 2023 14:37:39 +0100 Subject: [PATCH 11/18] Fix bug introduced in fork_actions --- lib_eio/unix/fork_action.c | 1 - 1 file changed, 1 deletion(-) diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index 8ed452387..38dbb02a8 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -12,7 +12,6 @@ #ifdef _WIN32 #else void eio_unix_run_fork_actions(int errors, value v_actions) { - uerror("Unsupported operation on windows", Nothing); int old_flags = fcntl(errors, F_GETFL, 0); fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK); while (Is_block(v_actions)) { From fa934d09c2ee85366ba954102e65f7e5ae5999f4 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Wed, 3 May 2023 11:28:33 +0100 Subject: [PATCH 12/18] Use ifndef Co-authored-by: Thomas Leonard --- lib_eio/unix/fork_action.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index 38dbb02a8..fdef08a31 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -9,8 +9,7 @@ #include "fork_action.h" -#ifdef _WIN32 -#else +#ifndef _WIN32 void eio_unix_run_fork_actions(int errors, value v_actions) { int old_flags = fcntl(errors, F_GETFL, 0); fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK); From 695499e21492dba70fec84ab3a12063a1414ce63 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Thu, 4 May 2023 23:10:31 +0100 Subject: [PATCH 13/18] Disable MDX tests on Windows --- lib_eio/tests/dune | 1 + 1 file changed, 1 insertion(+) diff --git a/lib_eio/tests/dune b/lib_eio/tests/dune index 48fdb1987..217338572 100644 --- a/lib_eio/tests/dune +++ b/lib_eio/tests/dune @@ -1,5 +1,6 @@ (mdx (package eio) + (enabled_if (<> %{os_type} "Win32")) (deps (package eio) (file ./dscheck/fake_sched.ml) From f3d70518fa4268390807db7c8ce48ae80e72fa2d Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Fri, 5 May 2023 00:24:26 +0100 Subject: [PATCH 14/18] Add more windows tests and fix network code --- lib_eio_windows/domain_mgr.ml | 3 +- lib_eio_windows/low_level.ml | 6 -- lib_eio_windows/net.ml | 12 ++-- lib_eio_windows/test/test.ml | 56 ++++----------- lib_eio_windows/test/test_net.ml | 118 +++++++++++++++++++++++++++++++ 5 files changed, 139 insertions(+), 56 deletions(-) create mode 100755 lib_eio_windows/test/test_net.ml diff --git a/lib_eio_windows/domain_mgr.ml b/lib_eio_windows/domain_mgr.ml index 1c054d83e..f6c2df733 100755 --- a/lib_eio_windows/domain_mgr.ml +++ b/lib_eio_windows/domain_mgr.ml @@ -30,7 +30,8 @@ let run_event_loop fn x = | Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k (Time.mono_clock : Eio.Time.Mono.t)) | Eio_unix.Private.Socket_of_fd (sw, close_unix, unix_fd) -> Some (fun k -> let fd = Fd.of_unix ~sw ~blocking:false ~close_unix unix_fd in - Unix.set_nonblock unix_fd; + (* TODO: On Windows, if the FD from Unix.pipe () is passed this will fail *) + (try Unix.set_nonblock unix_fd with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> ()); continue k (Flow.of_fd fd :> Eio_unix.socket) ) | Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k -> diff --git a/lib_eio_windows/low_level.ml b/lib_eio_windows/low_level.ml index 7e35d7b88..e52bbb20f 100755 --- a/lib_eio_windows/low_level.ml +++ b/lib_eio_windows/low_level.ml @@ -162,12 +162,6 @@ module Open_flags = struct let cloexec = Config.o_noinherit let creat = Config.o_creat let excl = Config.o_excl - (* let directory = Config.o_directory - let dsync = Config.o_dsync - let noctty = Config.o_noctty - let nofollow = Config.o_nofollow *) - (* let nonblock = Config.o_nonblock *) - (* let sync = Config.o_sync *) let trunc = Config.o_trunc let empty = 0 diff --git a/lib_eio_windows/net.ml b/lib_eio_windows/net.ml index 27921307d..4f771ef4f 100755 --- a/lib_eio_windows/net.ml +++ b/lib_eio_windows/net.ml @@ -82,7 +82,7 @@ let getaddrinfo ~service node = aux () let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.stream) = - let socket_type, addr = + let socket_type, addr, is_unix_socket = match listen_addr with | `Unix path -> if reuse_addr then ( @@ -92,10 +92,10 @@ let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr. | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () | exception Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg ); - Unix.SOCK_STREAM, Unix.ADDR_UNIX path + Unix.SOCK_STREAM, Unix.ADDR_UNIX path, true | `Tcp (host, port) -> let host = Eio_unix.Ipaddr.to_unix host in - Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port), false in let sock = Low_level.socket ~sw (socket_domain_of listen_addr) socket_type 0 in (* For Unix domain sockets, remove the path when done (except for abstract sockets). *) @@ -107,12 +107,14 @@ let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr. Switch.null_hook in Fd.use_exn "listen" sock (fun fd -> - if reuse_addr then + (* REUSEADDR cannot be set on a Windows UNIX domain socket, + otherwise the Unix.bind will fail! *) + if not is_unix_socket && reuse_addr then Unix.setsockopt fd Unix.SO_REUSEADDR true; if reuse_port then Unix.setsockopt fd Unix.SO_REUSEPORT true; Unix.bind fd addr; - Unix.listen fd backlog; + Unix.listen fd backlog ); listening_socket ~hook sock diff --git a/lib_eio_windows/test/test.ml b/lib_eio_windows/test/test.ml index eac63960a..db699ab61 100755 --- a/lib_eio_windows/test/test.ml +++ b/lib_eio_windows/test/test.ml @@ -12,56 +12,24 @@ module Timeout = struct ] end -module Net = struct - open Eio.Std - - let read_all flow = - let b = Buffer.create 100 in - Eio.Flow.copy flow (Eio.Flow.buffer_sink b); - Buffer.contents b - - let run_client ~sw ~net ~addr = - traceln "Connecting to server..."; - let flow = Eio.Net.connect ~sw net addr in - Eio.traceln "connected"; - Eio.Flow.copy_string "Hello from client" flow; - Eio.Flow.shutdown flow `Send; - let msg = read_all flow in - msg - - let run_server ~sw msg socket = - Eio.Net.accept_fork socket ~sw (fun flow _addr -> - traceln "Server accepted connection from client"; - Fun.protect (fun () -> - let msg = read_all flow in - traceln "Server received: %S" msg - ) ~finally:(fun () -> Eio.Flow.copy_string msg flow) - ) - ~on_error:(function - | ex -> traceln "Error handling connection: %s" (Printexc.to_string ex) - ) - - let test_client_server env () = - Eio.Switch.run @@ fun sw -> - let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8081) in - let server = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:5 addr in - let msg = "From the server" in - Fiber.both - (fun () -> run_server ~sw msg server) - (fun () -> - let client_msg = run_client ~sw ~net:env#net ~addr in - Alcotest.(check string) "same message" msg client_msg - ) - +module Random = struct + let test_random env () = + let src = Eio.Stdenv.secure_random env in + let b1 = Cstruct.create 8 in + let b2 = Cstruct.create 8 in + Eio.Flow.read_exact src b1; + Eio.Flow.read_exact src b2; + Alcotest.(check bool) "different random" (not (Cstruct.equal b1 b2)) true let tests env = [ - "server-client", `Quick, test_client_server env + "different", `Quick, test_random env ] end let () = Eio_windows.run @@ fun env -> Alcotest.run "eio_windows" [ - "net", Net.tests env; - "timeout", Timeout.tests env + "net", Test_net.tests env; + "timeout", Timeout.tests env; + "random", Random.tests env ] \ No newline at end of file diff --git a/lib_eio_windows/test/test_net.ml b/lib_eio_windows/test/test_net.ml new file mode 100755 index 000000000..0877c9f0a --- /dev/null +++ b/lib_eio_windows/test/test_net.ml @@ -0,0 +1,118 @@ +open Eio.Std + +let read_all flow = + let b = Buffer.create 100 in + Eio.Flow.copy flow (Eio.Flow.buffer_sink b); + Buffer.contents b + +let run_client ~sw ~net ~addr = + traceln "Connecting to server..."; + let flow = Eio.Net.connect ~sw net addr in + Eio.traceln "connected"; + Eio.Flow.copy_string "Hello from client" flow; + Eio.Flow.shutdown flow `Send; + let msg = read_all flow in + msg + +let run_server ~sw msg socket = + Eio.Net.accept_fork socket ~sw (fun flow _addr -> + traceln "Server accepted connection from client"; + Fun.protect (fun () -> + let msg = read_all flow in + traceln "Server received: %S" msg + ) ~finally:(fun () -> Eio.Flow.copy_string msg flow) + ) + ~on_error:(function + | ex -> traceln "Error handling connection: %s" (Printexc.to_string ex) + ) + +let test_client_server env addr () = + Eio.Switch.run @@ fun sw -> + let server = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:5 addr in + let msg = "From the server" in + Fiber.both + (fun () -> run_server ~sw msg server) + (fun () -> + let client_msg = run_client ~sw ~net:env#net ~addr in + Alcotest.(check string) "same message" msg client_msg + ) + +let run_dgram addr ~net sw msg = + let e1 = `Udp (addr, 8081) in + let e2 = `Udp (addr, 8082) in + let listening_socket = Eio.Net.datagram_socket ~sw net e2 in + Fiber.both + (fun () -> + let buf = Cstruct.create 20 in + traceln "Waiting to receive data on %a" Eio.Net.Sockaddr.pp e2; + let addr, recv = Eio.Net.recv listening_socket buf in + traceln "Received message from %a" + Eio.Net.Sockaddr.pp addr; + Alcotest.(check string) "same udp msg" msg (Cstruct.(to_string (sub buf 0 recv))) + ) + (fun () -> + let e = Eio.Net.datagram_socket ~sw net e1 in + traceln "Sending data from %a to %a" Eio.Net.Sockaddr.pp e1 Eio.Net.Sockaddr.pp e2; + Eio.Net.send e e2 (Cstruct.of_string msg)) + +let test_udp env addr () = + Eio.Switch.run @@ fun sw -> + run_dgram addr ~net:env#net sw "UDP on Windows" + +let test_fd env addr () = + Eio.Switch.run @@ fun sw -> + let addr = `Tcp (addr, 8081) in + let server = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:5 addr in + Alcotest.(check bool) "Listening socket has Unix FD" (Eio_unix.Resource.fd_opt server <> None) true; + let have_client, have_server = + Fiber.pair + (fun () -> + let flow = Eio.Net.connect ~sw env#net addr in + (Eio_unix.Resource.fd_opt flow <> None) + ) + (fun () -> + let flow, _addr = Eio.Net.accept ~sw server in + (Eio_unix.Resource.fd_opt flow <> None) + ) + in + Alcotest.(check bool) "Client-side socket has Unix FD" have_client true; + Alcotest.(check bool) "Server-side socket has Unix FD" have_server true + +let test_wrap_socket pipe_or_socketpair () = + Switch.run @@ fun sw -> + let r, w = + match pipe_or_socketpair with + | `Pipe -> Unix.pipe () + | `Socketpair -> Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 + in + let source = (Eio_unix.import_socket_stream ~sw ~close_unix:true r :> Eio.Flow.source) in + let sink = (Eio_unix.import_socket_stream ~sw ~close_unix:true w :> Eio.Flow.sink) in + let msg = "Hello" in + Fiber.both + (fun () -> Eio.Flow.copy_string (msg ^ "\n") sink) + (fun () -> + let b = Eio.Buf_read.of_flow source ~max_size:1000 in + Alcotest.(check string) "same message" (Eio.Buf_read.line b) msg + ) + +let test_eio_socketpair () = + Switch.run @@ fun sw -> + let a, b = Eio_unix.socketpair ~sw () in + ignore (Eio_unix.Resource.fd a : Eio_unix.Fd.t); + ignore (Eio_unix.Resource.fd b : Eio_unix.Fd.t); + Eio.Flow.copy_string "foo" a; + Eio.Flow.close a; + let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in + Alcotest.(check string) "same messagw" "foo" msg + +let tests env = [ + "tcp-ip4", `Quick, test_client_server env (`Tcp (Eio.Net.Ipaddr.V4.loopback, 8081)); + "tcp-ip6", `Quick, test_client_server env (`Tcp (Eio.Net.Ipaddr.V6.loopback, 8081)); + "unix", `Quick, test_client_server env (`Unix "eio-test.sock"); + "udp-ip4", `Quick, test_udp env Eio.Net.Ipaddr.V4.loopback; + "udp-ip6", `Quick, test_udp env Eio.Net.Ipaddr.V6.loopback; + "fds", `Quick, test_fd env Eio.Net.Ipaddr.V4.loopback; + "wrap-pipe", `Quick, test_wrap_socket `Pipe; + "wrap-socketpair", `Quick, test_wrap_socket `Socketpair; + "eio-socketpair", `Quick, test_eio_socketpair +] \ No newline at end of file From e5b57fab3242b12b087d3c6b007c7f2cf9915626 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Fri, 5 May 2023 00:29:41 +0100 Subject: [PATCH 15/18] Fix rebase mistake in stubs --- lib_eio/unix/stubs.c | 1 + 1 file changed, 1 insertion(+) diff --git a/lib_eio/unix/stubs.c b/lib_eio/unix/stubs.c index e6ff58bfc..bc0b400bb 100644 --- a/lib_eio/unix/stubs.c +++ b/lib_eio/unix/stubs.c @@ -15,4 +15,5 @@ CAMLprim value eio_unix_is_blocking(value v_fd) { uerror("fcntl", Nothing); return Val_bool((r & O_NONBLOCK) == 0); + #endif } From 9a9c02ba080d63477b5cdcbad5999928bcc59bb0 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Fri, 5 May 2023 12:44:42 +0100 Subject: [PATCH 16/18] Support DLA on Windows --- dune-project | 1 + eio_windows.opam | 1 + lib_eio_windows/sched.ml | 8 ++++++-- lib_eio_windows/test/dune | 2 +- lib_eio_windows/test/test.ml | 26 +++++++++++++++++++++++++- 5 files changed, 34 insertions(+), 4 deletions(-) diff --git a/dune-project b/dune-project index 66cb2f54c..ecb88cb3a 100644 --- a/dune-project +++ b/dune-project @@ -61,6 +61,7 @@ (depends (eio (= :version)) (cstruct-unix (= dev)) + (kcas (and (>= 0.3.0) :with-test)) (alcotest (and (>= 1.4.0) :with-test)))) (package (name eio_main) diff --git a/eio_windows.opam b/eio_windows.opam index 8676ffdbd..b007be8d2 100755 --- a/eio_windows.opam +++ b/eio_windows.opam @@ -12,6 +12,7 @@ depends: [ "dune" {>= "3.7"} "eio" {= version} "cstruct-unix" {= "dev"} + "kcas" {>= "0.3.0" & with-test} "alcotest" {>= "1.4.0" & with-test} "odoc" {with-doc} ] diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml index 1f4721206..0cc587c9e 100755 --- a/lib_eio_windows/sched.ml +++ b/lib_eio_windows/sched.ml @@ -355,8 +355,12 @@ let run ~extra_effects t main x = let result = ref None in let `Exit_scheduler = let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - result := Some (with_op t main x); + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> + result := Some (with_op t main x); + ) ) in match !result with diff --git a/lib_eio_windows/test/dune b/lib_eio_windows/test/dune index 22eb7d91d..0b97f06ca 100755 --- a/lib_eio_windows/test/dune +++ b/lib_eio_windows/test/dune @@ -8,6 +8,6 @@ let () = Jbuild_plugin.V1.send @@ if not win32 then "" else {| (name test) (package eio_windows) (enabled_if (= %{os_type} "Win32")) - (libraries alcotest eio.mock eio_windows)) + (libraries alcotest kcas eio.mock eio_windows)) |} \ No newline at end of file diff --git a/lib_eio_windows/test/test.ml b/lib_eio_windows/test/test.ml index db699ab61..510c40964 100755 --- a/lib_eio_windows/test/test.ml +++ b/lib_eio_windows/test/test.ml @@ -26,10 +26,34 @@ module Random = struct ] end +module Dla = struct + + let test_dla () = + let open Kcas in + let x = Loc.make 0 in + let y = Loc.make 0 in + let foreign_domain = Domain.spawn @@ fun () -> + let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in + Loc.set y 22; + x + in + Loc.set x 20; + let y' = Loc.get_as (fun y -> Retry.unless (y <> 0); y) y in + Alcotest.(check int) "correct y" y' 22; + let ans = y' + Domain.join foreign_domain in + Alcotest.(check int) "answer" ans 42 + + let tests = [ + "dla", `Quick, test_dla + ] +end + + let () = Eio_windows.run @@ fun env -> Alcotest.run "eio_windows" [ "net", Test_net.tests env; "timeout", Timeout.tests env; - "random", Random.tests env + "random", Random.tests env; + "dla", Dla.tests ] \ No newline at end of file From 393ec686e389d611814d4453835076289e504ddb Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Fri, 5 May 2023 13:04:11 +0100 Subject: [PATCH 17/18] Add kcas --- .github/workflows/main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6244b0821..2142ea6bf 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -54,7 +54,7 @@ jobs: - run: | opam pin -yn eio.dev . opam pin -yn eio_windows.dev . - opam install ocamlfind.1.9.5 alcotest mdx crowbar -y + opam install ocamlfind.1.9.5 kcas alcotest mdx crowbar -y opam install eio eio_windows --deps-only - run: opam exec -- dune runtest docker: @@ -62,4 +62,4 @@ jobs: steps: - uses: actions/checkout@v2 - name: Build the Docker image - run: docker build . \ No newline at end of file + run: docker build . From cab124d5e09e4114b5a29c1e60895f7e32ecead1 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Mon, 8 May 2023 16:09:37 +0100 Subject: [PATCH 18/18] Don't allocate OCaml exceptions in Windows fork actions --- lib_eio/unix/fork_action.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index fdef08a31..7f6ef0c33 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -71,7 +71,7 @@ CAMLprim value eio_unix_fork_execve(value v_unit) { static void action_fchdir(int errors, value v_config) { #ifdef _WIN32 - uerror("Unsupported operation on windows", Nothing); + eio_unix_fork_error(errors, "action_fchdir", "Unsupported operation on windows"); #else value v_fd = Field(v_config, 1); int r; @@ -103,7 +103,7 @@ CAMLprim value eio_unix_fork_chdir(value v_unit) { static void set_blocking(int errors, int fd, int blocking) { #ifdef _WIN32 - uerror("Unsupported operation on windows", Nothing); + eio_unix_fork_error(errors, "set_blocking", "Unsupported operation on windows"); #else int r = fcntl(fd, F_GETFL, 0); if (r != -1) { @@ -123,7 +123,7 @@ static void set_blocking(int errors, int fd, int blocking) { static void set_cloexec(int errors, int fd, int cloexec) { #ifdef _WIN32 - uerror("Unsupported operation on windows", Nothing); + eio_unix_fork_error(errors, "set_cloexec", "Unsupported operation on windows"); #else int r = fcntl(fd, F_GETFD, 0); if (r != -1) {