diff --git a/src/handlers.rs b/src/handlers.rs index 0bf08198..03323d82 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -23,12 +23,17 @@ use zenoh::handlers::{CallbackParameter, IntoHandler}; use crate::{ macros::{import, py_static}, - utils::{generic, short_type_name, IntoPyErr, IntoPyResult, IntoPython, IntoRust}, + utils::{generic, short_type_name, IntoPyResult, IntoPython, IntoRust}, ZError, }; type RustCallback = zenoh::handlers::Callback; +/// See [`Python::check_signals`] documentation. +/// +/// Signals received by Python interpreter while executing Rust code in `allow_threads` +/// are not handled and kept as pending. It's Rust code responsibility to regularly check +/// them. Blocking calls like channel `recv` must then be done in a loop with small timeouts. const CHECK_SIGNALS_INTERVAL: Duration = Duration::from_millis(100); const DROP_CALLBACK_WARNING: &str = "Passing drop-callback using a tuple \ `(callback, drop-callback)` no longer works in 1.0;\n\ @@ -279,105 +284,32 @@ where _phantom: PhantomData, } -fn try_recv( - py: Python, - f: impl FnOnce() -> Result + Send, -) -> PyResult { - Ok(py.allow_threads(f).into_pyres()?.into_pyobject(py)) -} +macro_rules! impl_receiver { + ($($channel:ident),* $(,)?) => {$( + impl Receiver for RustHandler<$channel, T> { + fn type_name(&self) -> &'static str { + short_type_name::() + } -fn recv( - py: Python, - f: impl Fn() -> Result + Sync, - is_timeout: impl Fn(&E) -> bool, -) -> PyResult { - loop { - match py.allow_threads(&f) { - Ok(obj) => return Ok(obj.into_pyobject(py)), - Err(err) if is_timeout(&err) => py.check_signals()?, - Err(err) => return Err(err.into_pyerr()), - } - } -} + fn try_recv(&self, py: Python) -> PyResult { + Ok(self.handler.try_recv().into_pyres()?.into_pyobject(py)) + } -enum DeadlineError { - Timeout, - Error(E), -} -impl fmt::Display for DeadlineError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Error(err) => write!(f, "{err}"), - Self::Timeout => unreachable!(), + fn recv(&self, py: Python) -> PyResult { + // See `CHECK_SIGNALS_INTERVAL` doc + let recv_timeout = || self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL); + loop { + match py.allow_threads(recv_timeout).into_pyres()? + { + Some(obj) => return Ok(obj.into_pyobject(py)), + None => py.check_signals()?, + } + } + } } - } -} - -impl Receiver for RustHandler { - fn type_name(&self) -> &'static str { - short_type_name::() - } - - fn try_recv(&self, py: Python) -> PyResult { - try_recv(py, || PyResult::Ok(self.handler.try_recv().ok())) - } - - fn recv(&self, py: Python) -> PyResult { - recv( - py, - || match self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL) { - Ok(Some(x)) => Ok(x), - Ok(None) => Err(DeadlineError::Timeout), - Err(err) => Err(DeadlineError::Error(err)), - }, - |err| matches!(err, DeadlineError::Timeout), - ) - } -} - -impl Receiver for RustHandler { - fn type_name(&self) -> &'static str { - short_type_name::() - } - - fn try_recv(&self, py: Python) -> PyResult { - try_recv(py, || PyResult::Ok(self.handler.try_recv().ok())) - } - - fn recv(&self, py: Python) -> PyResult { - recv( - py, - || match self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL) { - Ok(Some(x)) => Ok(x), - Ok(None) => Err(DeadlineError::Timeout), - Err(err) => Err(DeadlineError::Error(err)), - }, - |err| matches!(err, DeadlineError::Timeout), - ) - } -} - -impl Receiver for RustHandler { - fn type_name(&self) -> &'static str { - short_type_name::() - } - - fn try_recv(&self, py: Python) -> PyResult { - try_recv(py, || self.handler.try_recv()) - } - - fn recv(&self, py: Python) -> PyResult { - recv( - py, - || match self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL) { - Ok(Some(x)) => Ok(x), - Ok(None) => Err(DeadlineError::Timeout), - Err(err) => Err(DeadlineError::Error(err)), - }, - |err| matches!(err, DeadlineError::Timeout), - ) - } + )*}; } +impl_receiver!(DefaultHandler, FifoChannel, RingChannel); fn rust_handler( py: Python,