From 68d7808898fd09cea15846af6068013f3fbaf013 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Mon, 22 Nov 2021 16:20:08 -0800 Subject: [PATCH 01/13] riir --- Cargo.toml | 4 - build.rs | 47 -- libsettings_wrapper.h | 52 -- src/client.rs | 1347 ++++++++++++++++++----------------------- src/lib.rs | 11 +- src/settings.rs | 23 +- 6 files changed, 623 insertions(+), 861 deletions(-) delete mode 100644 libsettings_wrapper.h diff --git a/Cargo.toml b/Cargo.toml index 26291c4..de2cd3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,3 @@ serde_yaml = "0.8" [dev-dependencies] mockstream = { git = "https://github.com/lazy-bitfield/rust-mockstream", rev = "0.0.4" } - -[build-dependencies] -cmake = "0.1" -bindgen = "0.59" diff --git a/build.rs b/build.rs index 30324f9..a6ce651 100644 --- a/build.rs +++ b/build.rs @@ -2,55 +2,8 @@ use std::{env, fs, path::PathBuf}; fn main() { println!("cargo:rerun-if-changed=build.rs"); - let out_dir: PathBuf = env::var("OUT_DIR").expect("OUT_DIR was not set").into(); - let settings = fs::read_to_string("src/libsettings/settings.yaml").expect("failed to load settings"); fs::write(out_dir.join("settings.yaml"), settings).expect("failed to write settings"); - - let dst = cmake::Config::new("src/libsettings/") - .define("SKIP_UNIT_TESTS", "ON") - .define("BUILD_SHARED_LIBS", "OFF") - .define("libsettings_ENABLE_PYTHON", "OFF") - .define("CMAKE_INSTALL_PREFIX", &out_dir) - .build(); - - println!("cargo:rustc-link-search=native={}/lib", dst.display()); - println!("cargo:rustc-link-lib=static=sbp"); - println!("cargo:rustc-link-lib=static=settings"); - println!("cargo:rustc-link-lib=static=swiftnav"); - - let bindings = bindgen::Builder::default() - .header("./libsettings_wrapper.h") - .allowlist_function("settings_create") - .allowlist_function("settings_destroy") - .allowlist_function("settings_read_bool") - .allowlist_function("settings_read_by_idx") - .allowlist_function("settings_read_float") - .allowlist_function("settings_read_int") - .allowlist_function("settings_read_str") - .allowlist_function("settings_write_str") - .allowlist_type("settings_t") - .allowlist_type("sbp_msg_callback_t") - .allowlist_type("sbp_msg_callbacks_node_t") - .allowlist_type("settings_api_t") - .allowlist_type("settings_write_res_e_SETTINGS_WR_MODIFY_DISABLED") - .allowlist_type("settings_write_res_e_SETTINGS_WR_OK") - .allowlist_type("settings_write_res_e_SETTINGS_WR_PARSE_FAILED") - .allowlist_type("settings_write_res_e_SETTINGS_WR_READ_ONLY") - .allowlist_type("settings_write_res_e_SETTINGS_WR_SERVICE_FAILED") - .allowlist_type("settings_write_res_e_SETTINGS_WR_SETTING_REJECTED") - .allowlist_type("settings_write_res_e_SETTINGS_WR_TIMEOUT") - .allowlist_type("settings_write_res_e_SETTINGS_WR_VALUE_REJECTED") - .clang_arg(format!("-I{}/include", dst.display())) - .clang_arg(format!( - "-I{}/third_party/libswiftnav/include", - dst.display() - )) - .clang_arg(format!("-I{}/third_party/libsbp/c/include", dst.display())) - .generate() - .unwrap(); - - bindings.write_to_file(out_dir.join("bindings.rs")).unwrap() } diff --git a/libsettings_wrapper.h b/libsettings_wrapper.h deleted file mode 100644 index eec882e..0000000 --- a/libsettings_wrapper.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef LIBSETTINGS_WRAPPER_H -#define LIBSETTINGS_WRAPPER_H - -typedef unsigned char uint8_t; -typedef signed char int8_t; - -typedef unsigned short uint16_t; -typedef signed short int16_t; - -typedef unsigned int uint32_t; -typedef signed int int32_t; - -#if defined(__GNUC__) && defined(__linux__) -typedef int int64_t __attribute__((__mode__(__DI__))); -typedef unsigned long int uint64_t; -#else -typedef long long int64_t; -typedef unsigned long long uint64_t; -#endif - -#define UINT8_MAX 255 - -typedef int8_t _s8; -typedef int16_t _s16; -typedef int32_t _s32; -typedef int64_t _s64; -typedef uint8_t _u8; -typedef uint16_t _u16; -typedef uint32_t _u32; -typedef uint64_t _u64; - -#define s8 _s8 -#define s16 _s16 -#define s32 _s32 -#define s64 _s64 -#define u8 _u8 -#define u16 _u16 -#define u32 _u32 -#define u64 _u64 - -#ifndef bool -#define bool _Bool -#endif - -#ifndef _BUILD_RUSTBIND_LIB_ -/* Put stuff here that shouldn't be seen during rustbindsettings build */ -#endif //_BUILD_RUSTBIND_LIB_ - -#define _RUSTC_BINDGEN_ 1 -#include "libsettings/settings.h" - -#endif diff --git a/src/client.rs b/src/client.rs index f315007..86ca37d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,373 +1,447 @@ use std::{ - convert::TryInto, - ffi::{self, CStr, CString}, - os::raw::{c_char, c_void}, - ptr, slice, - time::Duration, + borrow::Cow, + io, + sync::{ + atomic::{AtomicU16, AtomicUsize, Ordering}, + Arc, + }, + thread::JoinHandle, + time::{Duration, Instant}, }; -use crate::bindings::{ - sbp_msg_callback_t, sbp_msg_callbacks_node_t, settings_api_t, settings_create, - settings_destroy, settings_read_bool, settings_read_by_idx, settings_read_float, - settings_read_int, settings_read_str, settings_t, - settings_write_res_e_SETTINGS_WR_MODIFY_DISABLED, settings_write_res_e_SETTINGS_WR_OK, - settings_write_res_e_SETTINGS_WR_PARSE_FAILED, settings_write_res_e_SETTINGS_WR_READ_ONLY, - settings_write_res_e_SETTINGS_WR_SERVICE_FAILED, - settings_write_res_e_SETTINGS_WR_SETTING_REJECTED, settings_write_res_e_SETTINGS_WR_TIMEOUT, - settings_write_res_e_SETTINGS_WR_VALUE_REJECTED, settings_write_str, size_t, -}; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::thread; -use log::{debug, error, warn}; +use log::warn; +use parking_lot::Mutex; use sbp::{ - link::{Key, Link}, - Sbp, SbpMessage, + link::{Link, LinkSource}, + messages::settings::{ + MsgSettingsReadByIndexDone, MsgSettingsReadByIndexReq, MsgSettingsReadByIndexResp, + MsgSettingsReadReq, MsgSettingsReadResp, MsgSettingsWrite, MsgSettingsWriteResp, + }, + Sbp, SbpIterExt, }; use crate::{Setting, SettingKind, SettingValue}; const SENDER_ID: u16 = 0x42; -const NUM_WORKERS: usize = 5; +const NUM_WORKERS: usize = 10; pub struct Client<'a> { - inner: Box>, -} - -struct ClientInner<'a> { - context: *mut Context<'a>, - api: *mut settings_api_t, - ctx: *mut settings_t, -} - -impl Drop for ClientInner<'_> { - fn drop(&mut self) { - unsafe { - // Safety: self.api was created via Box::into_raw. - // self.context is freed via the call to settings_destroy - let _ = Box::from_raw(self.api); - settings_destroy(&mut self.ctx); - } - } + link: Link<'a, ()>, + sender: MsgSender, + handle: Option>, } -unsafe impl Send for ClientInner<'_> {} -unsafe impl Sync for ClientInner<'_> {} - impl<'a> Client<'a> { - pub fn new(link: Link<'a, ()>, sender: F) -> Client<'a> + pub fn new(reader: R, mut writer: W) -> Client<'static> where - F: FnMut(Sbp) -> Result<(), Box> + 'static, + R: io::Read + Send + 'static, + W: io::Write + Send + 'static, { - let context = Box::new(Context { - link, - sender: Box::new(sender), - callbacks: Vec::new(), - lock: Lock::new(), - event: None, + let source = LinkSource::new(); + let mut client = Client::<'static>::with_link(source.link(), move |msg| { + sbp::to_writer(&mut writer, &msg).map_err(Into::into) }); + client.handle = Some(std::thread::spawn(move || { + let messages = sbp::iter_messages(reader).log_errors(log::Level::Warn); + for msg in messages { + source.send(msg); + } + })); + client + } - let api = Box::new(settings_api_t { - ctx: ptr::null_mut(), - send: Some(libsettings_send), - send_from: Some(libsettings_send_from), - wait_init: Some(libsettings_wait_init), - wait: Some(libsettings_wait), - wait_deinit: None, - signal: Some(libsettings_signal), - wait_thd: Some(libsettings_wait_thd), - signal_thd: Some(libsettings_signal_thd), - lock: Some(libsettings_lock), - unlock: Some(libsettings_unlock), - register_cb: Some(libsettings_register_cb), - unregister_cb: Some(libsettings_unregister_cb), - log: None, - log_preformatted: Some(libsettings_log_wrapper), - }); - - let mut inner = Box::new(ClientInner { - context: ptr::null_mut(), - api: Box::into_raw(api), - ctx: ptr::null_mut(), - }); - - let context_raw = Box::into_raw(context); - inner.context = context_raw; - - // Safety: inner.api was just created via Box::into_raw so it is - // properly aligned and non-null - unsafe { - (*inner.api).ctx = context_raw as *mut c_void; + pub fn with_link(link: Link<'a, ()>, sender: F) -> Client<'a> + where + F: FnMut(Sbp) -> Result<(), BoxedError> + Send + 'static, + { + Self { + link, + sender: MsgSender(Arc::new(Mutex::new(Box::new(sender)))), + handle: None, } + } - inner.ctx = unsafe { settings_create(SENDER_ID, inner.api) }; - assert!(!inner.ctx.is_null()); - - Client { inner } + pub fn write_setting( + &self, + group: impl Into, + name: impl Into, + value: impl Into, + ) -> Result<(), Error> { + let (ctx, _ctx_handle) = Context::new(); + self.write_setting_ctx(group, name, value, ctx) } - pub fn read_all(&self) -> (Vec, Vec>) { - let (_, r) = crossbeam_channel::bounded(0); - self.read_all_inner(r) + pub fn write_setting_ctx( + &self, + group: impl Into, + name: impl Into, + value: impl Into, + ctx: Context, + ) -> Result<(), Error> { + self.write_setting_inner(group.into(), name.into(), value.into(), ctx) } - pub fn read_all_timeout( + pub fn read_setting( &self, - timeout: Duration, - ) -> (Vec, Vec>) { - let (done_s, done_r) = crossbeam_channel::bounded(0); - let (stop_s, stop_r) = crossbeam_channel::bounded(NUM_WORKERS); - thread::scope(move |scope| { - scope.spawn(move |_| { - crossbeam_channel::select! { - recv(done_r) -> _ => return, - default(timeout) => (), - }; - warn!("settings read_all timed out"); - for _ in 0..NUM_WORKERS { - let _ = stop_s.try_send(()); - } - }); - let res = self.read_all_inner(stop_r); - let _ = done_s.send(()); - res - }) - .unwrap() + group: impl Into, + name: impl Into, + ) -> Result, Error> { + let (ctx, _ctx_handle) = Context::new(); + self.read_setting_ctx(group, name, ctx) } - fn read_all_inner( + pub fn read_setting_ctx( &self, - stop: Receiver<()>, - ) -> (Vec, Vec>) { - thread::scope(move |scope| { - let (idx_s, idx_r) = crossbeam_channel::bounded(NUM_WORKERS); - let (settings_s, settings_r) = crossbeam_channel::unbounded(); - let (errors_s, errors_r) = crossbeam_channel::unbounded(); - - scope.spawn(move |_| { - let mut idx = 0; - while idx_s.send(idx).is_ok() { - idx += 1; - } - }); + group: impl Into, + name: impl Into, + ctx: Context, + ) -> Result, Error> { + self.read_setting_inner(group.into(), name.into(), ctx) + } + + pub fn read_all(&self) -> (Vec, Vec>) { + let (ctx, _ctx_handle) = Context::new(); + self.read_all_ctx(ctx) + } + pub fn read_all_ctx(&self, ctx: Context) -> (Vec, Vec>) { + self.read_all_inner(ctx) + } + + fn read_all_inner(&self, ctx: Context) -> (Vec, Vec>) { + let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); + let key = self.link.register(move |_: MsgSettingsReadByIndexDone| { + if !done_tx.is_empty() { + return; + } for _ in 0..NUM_WORKERS { - scope.spawn({ - let idx_r = idx_r.clone(); - let settings_s = settings_s.clone(); - let errors_s = errors_s.clone(); - let stop = stop.clone(); - move |_| { - for idx in idx_r.iter() { - if stop.try_recv().is_ok() { - break; - } - match self.read_by_index(idx) { - Ok(Some(setting)) => settings_s.send(setting).unwrap(), - Ok(None) => break, - Err(err) => { - errors_s.send(Error::Err(err)).unwrap(); - break; - } - } - } - } + done_tx.try_send(()).unwrap(); + } + }); + let (settings, errors) = (Mutex::new(Vec::new()), Mutex::new(Vec::new())); + let idx = AtomicU16::new(0); + thread::scope(|scope| { + for _ in 0..NUM_WORKERS { + let idx = &idx; + let settings = &settings; + let errors = &errors; + let done_rx = &done_rx; + let ctx = ctx.clone(); + scope.spawn(move |_| { + self.worker_thd(idx, settings, errors, done_rx, ctx); }); } - // we need to drop these before calling `.iter()` to disconnect the channels - // otherwise `.iter()` will never terminate - drop(idx_r); - drop(settings_s); - drop(errors_s); - (settings_r.iter().collect(), errors_r.iter().collect()) }) - .expect("read_all worker thread panicked") - } - - pub fn read_by_index(&self, idx: u16) -> Result, ReadSettingError> { - const BUF_SIZE: usize = 255; - - let mut section = Vec::::with_capacity(BUF_SIZE); - let mut name = Vec::::with_capacity(BUF_SIZE); - let mut value = Vec::::with_capacity(BUF_SIZE); - let mut fmt_type = Vec::::with_capacity(BUF_SIZE); - let mut event = Event::new(); - - let status = unsafe { - settings_read_by_idx( - self.inner.ctx, - &mut event as *mut Event as *mut _, - idx, - section.as_mut_ptr(), - BUF_SIZE as size_t, - name.as_mut_ptr(), - BUF_SIZE as size_t, - value.as_mut_ptr(), - BUF_SIZE as size_t, - fmt_type.as_mut_ptr(), - BUF_SIZE as size_t, - ) - }; + .expect("read_all worker thread panicked"); + self.link.unregister(key); + settings.lock().sort_by_key(|(idx, _)| *idx); + errors.lock().sort_by_key(|(idx, _)| *idx); + ( + settings.into_inner().into_iter().map(|e| e.1).collect(), + errors.into_inner().into_iter().map(|e| e.1).collect(), + ) + } - match status { - 0 => Ok(Some(unsafe { - ReadByIdxResult { - group: CStr::from_ptr(section.as_ptr()) - .to_string_lossy() - .into_owned(), - name: CStr::from_ptr(name.as_ptr()).to_string_lossy().into_owned(), - value: CStr::from_ptr(value.as_ptr()) - .to_string_lossy() - .into_owned(), - fmt_type: CStr::from_ptr(fmt_type.as_ptr()) - .to_string_lossy() - .into_owned(), + fn worker_thd( + &self, + idx: &AtomicU16, + settings: &Mutex>, + errors: &Mutex)>>, + done_rx: &Receiver<()>, + ctx: Context, + ) { + loop { + let idx = idx.fetch_add(1, Ordering::SeqCst); + match self.read_by_index(idx, done_rx, &ctx) { + Ok(Some(setting)) => { + settings.lock().push((idx, setting)); + } + Ok(None) => { + break; + } + Err(err) => { + let exit = matches!(err, Error::TimedOut | Error::Canceled); + errors.lock().push((idx, err)); + if exit { + break; + } } - })), - status if status > 0 => Ok(None), - status => Err(status.into()), + } } } - pub fn read_setting( + fn read_by_index( &self, - group: impl AsRef, - name: impl AsRef, - ) -> Option>> { - self.read_setting_inner(group.as_ref(), name.as_ref()) + idx: u16, + done_rx: &Receiver<()>, + ctx: &Context, + ) -> Result, Error> { + let (tx, rx) = crossbeam_channel::bounded(1); + let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| { + if idx != msg.index { + return; + } + let setting = msg.setting.to_string(); + let mut fields = setting.split('\0'); + let group = fields.next().unwrap_or_default(); + let name = fields.next().unwrap_or_default(); + let entry = Setting::find(group, name); + if let Some(entry) = entry { + let value = fields + .next() + .and_then(|s| SettingValue::parse(s, entry.kind)); + let fmt_type = fields.next(); + if entry.kind == SettingKind::Enum { + if let Some(fmt_type) = fmt_type { + let mut parts = fmt_type.splitn(2, ':'); + let possible_values = parts.nth(1); + if let Some(p) = possible_values { + let mut entry = entry.clone(); + entry.enumerated_possible_values = Some(p.to_owned()); + let _ = tx.send(ReadResp { + entry: Cow::Owned(entry), + value, + }); + return; + } + } + } + let _ = tx.send(ReadResp { + entry: Cow::Borrowed(entry), + value, + }); + } else { + warn!( + "No settings documentation entry or name: {} in group: {}", + name, group + ); + let entry: Cow = Cow::Owned(Setting { + group: group.to_owned(), + name: name.to_owned(), + ..Default::default() + }); + let value = fields + .next() + .and_then(|s| SettingValue::parse(s, entry.kind)); + let _ = tx.send(ReadResp { entry, value }); + } + }); + self.sender.send(MsgSettingsReadByIndexReq { + sender_id: Some(SENDER_ID), + index: idx, + })?; + let res = crossbeam_channel::select! { + recv(rx) -> msg => Ok(Some(msg.unwrap())), + recv(done_rx) -> _ => Ok(None), + recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), + recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), + }; + self.link.unregister(key); + res } fn read_setting_inner( &self, - group: &str, - name: &str, - ) -> Option>> { - let setting = Setting::find(group, name)?; - - let cgroup = match CString::new(group) { - Ok(cgroup) => cgroup, - Err(e) => return Some(Err(e.into())), + group: String, + name: String, + ctx: Context, + ) -> Result, Error> { + let entry = match Setting::find(&group, &name) { + Some(setting) => setting, + None => return Ok(None), }; - - let cname = match CString::new(name) { - Ok(cname) => cname, - Err(e) => return Some(Err(e.into())), + let read_req = MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0", group, name).into(), }; + let (tx, rx) = crossbeam_channel::bounded(1); + let key = self.link.register(move |msg: MsgSettingsReadResp| { + let setting = msg.setting.to_string(); + let mut fields = setting.split('\0'); + if fields.next() == Some(&group) && fields.next() == Some(&name) { + match fields.next() { + Some(value) => { + let _ = tx.try_send(Some(ReadResp { + entry: Cow::Borrowed(entry), + value: SettingValue::parse(value, entry.kind), + })); + } + None => { + let _ = tx.try_send(None); + } + } + } + }); + self.sender.send(read_req)?; + let res = crossbeam_channel::select! { + recv(rx) -> msg => Ok(msg.unwrap()), + recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), + recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), + }; + self.link.unregister(key); + res + } - let (res, status) = match setting.kind { - SettingKind::Integer => unsafe { - let mut value: i32 = 0; - let status = - settings_read_int(self.inner.ctx, cgroup.as_ptr(), cname.as_ptr(), &mut value); - (SettingValue::Integer(value), status) - }, - SettingKind::Boolean => unsafe { - let mut value: bool = false; - let status = - settings_read_bool(self.inner.ctx, cgroup.as_ptr(), cname.as_ptr(), &mut value); - (SettingValue::Boolean(value), status) - }, - SettingKind::Float | SettingKind::Double => unsafe { - let mut value: f32 = 0.0; - let status = settings_read_float( - self.inner.ctx, - cgroup.as_ptr(), - cname.as_ptr(), - &mut value, - ); - (SettingValue::Float(value), status) - }, - SettingKind::String | SettingKind::Enum | SettingKind::PackedBitfield => unsafe { - let mut buf = Vec::::with_capacity(1024); - let status = settings_read_str( - self.inner.ctx, - cgroup.as_ptr(), - cname.as_ptr(), - buf.as_mut_ptr(), - buf.capacity().try_into().unwrap(), - ); - ( - SettingValue::String( - CStr::from_ptr(buf.as_ptr()).to_string_lossy().into_owned(), - ), - status, - ) - }, + fn write_setting_inner( + &self, + group: String, + name: String, + value: String, + ctx: Context, + ) -> Result<(), Error> { + let write_req = MsgSettingsWrite { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0{}\0", group, name, value).into(), }; + let (tx, rx) = crossbeam_channel::bounded(1); + let key = self.link.register(move |msg: MsgSettingsWriteResp| { + let setting = msg.setting.to_string(); + let mut fields = setting.split('\0'); + if fields.next() == Some(&group) && fields.next() == Some(&name) { + if msg.status == 0 { + let _ = tx.send(Ok(())); + } else { + let _ = tx.send(Err(Error::Err(WriteSettingError::from(msg.status)))); + } + } + }); + self.sender.send(write_req)?; + let res = crossbeam_channel::select! { + recv(rx) -> msg => msg.unwrap(), + recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), + recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), + }; + self.link.unregister(key); + res + } +} - debug!("{} {} {:?} {}", setting.group, setting.name, res, status); +#[derive(Debug, Clone, PartialEq)] +pub struct ReadResp { + pub entry: Cow<'static, Setting>, + pub value: Option, +} - if status == 0 { - Some(Ok(res)) - } else { - Some(Err(status.into())) - } +pub struct Context { + cancel_rx: Receiver<()>, + timeout_rx: Receiver, + timeout_at: Instant, + shared: Arc, +} + +impl Context { + pub fn new() -> (Context, CtxHandle) { + let (cancel_tx, cancel_rx) = crossbeam_channel::unbounded(); + let timeout_rx = crossbeam_channel::never(); + let timeout_at = Instant::now() + Duration::from_secs(60 * 60 * 24); + let shared = Arc::new(CtxShared::new(cancel_tx)); + ( + Context { + cancel_rx, + timeout_rx, + timeout_at, + shared: Arc::clone(&shared), + }, + CtxHandle { shared }, + ) } - pub fn write_setting( - &self, - group: impl AsRef, - name: impl AsRef, - value: impl AsRef, - ) -> Result<(), Error> { - self.write_setting_inner(group.as_ref(), name.as_ref(), value.as_ref()) + pub fn with_timeout(timeout: Duration) -> (Context, CtxHandle) { + let (mut ctx, h) = Self::new(); + ctx.set_timeout(timeout); + (ctx, h) } - fn write_setting_inner( - &self, - group: &str, - name: &str, - value: &str, - ) -> Result<(), Error> { - let cgroup = CString::new(group)?; - let cname = CString::new(name)?; - let cvalue = CString::new(value)?; - let mut event = Event::new(); - - let result = unsafe { - settings_write_str( - self.inner.ctx, - &mut event as *mut Event as *mut _, - cgroup.as_ptr(), - cname.as_ptr(), - cvalue.as_ptr(), - ) - }; + pub fn set_timeout(&mut self, timeout: Duration) { + self.timeout_at = Instant::now() + timeout; + self.timeout_rx = crossbeam_channel::at(self.timeout_at); + } - #[allow(non_upper_case_globals)] - match result { - settings_write_res_e_SETTINGS_WR_OK => Ok(()), - code => Err(code.into()), + pub fn cancel(&self) { + self.shared.cancel(); + } +} + +impl Clone for Context { + fn clone(&self) -> Self { + self.shared.num_chans.fetch_add(1, Ordering::SeqCst); + Self { + cancel_rx: self.cancel_rx.clone(), + timeout_rx: crossbeam_channel::at(self.timeout_at), + timeout_at: self.timeout_at, + shared: Arc::clone(&self.shared), } } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Error { - NulError, - Err(E), +pub struct CtxHandle { + shared: Arc, } -impl From for Error { - fn from(_: ffi::NulError) -> Self { - Error::NulError +impl CtxHandle { + pub fn cancel(&self) { + self.shared.cancel(); } } -impl From for Error -where - E: From, -{ - fn from(err: u32) -> Self { - Error::Err(err.into()) +struct CtxShared { + cancel_tx: Sender<()>, + num_chans: AtomicUsize, +} + +impl CtxShared { + fn new(cancel_tx: Sender<()>) -> Self { + Self { + cancel_tx, + num_chans: AtomicUsize::new(1), + } + } + + fn cancel(&self) { + for _ in 0..self.num_chans.load(Ordering::SeqCst) { + let _ = self.cancel_tx.try_send(()); + } } } -impl From for Error -where - E: From, -{ - fn from(err: i32) -> Self { - Error::Err(err.into()) +type BoxedError = Box; + +type SenderFunc = Box Result<(), BoxedError> + Send>; + +struct MsgSender(Arc>); + +impl MsgSender { + const RETRIES: usize = 5; + const TIMEOUT: Duration = Duration::from_millis(500); + + fn send(&self, msg: impl Into) -> Result<(), BoxedError> { + self.send_inner(msg.into(), 0) + } + + fn send_inner(&self, msg: Sbp, tries: usize) -> Result<(), BoxedError> { + let res = (self.0.lock())(msg.clone()); + if res.is_err() && tries < Self::RETRIES { + std::thread::sleep(Self::TIMEOUT); + self.send_inner(msg, tries + 1) + } else { + res + } + } +} + +#[derive(Debug)] +pub enum Error { + Err(E), + SenderError(BoxedError), + TimedOut, + Canceled, +} + +impl From for Error { + fn from(v: BoxedError) -> Self { + Self::SenderError(v) } } @@ -377,22 +451,16 @@ where { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Error::NulError => write!(f, "error converting to CString"), Error::Err(err) => write!(f, "{}", err), + Error::TimedOut => write!(f, "timed out"), + Error::Canceled => write!(f, "canceled"), + Error::SenderError(err) => write!(f, "{}", err), } } } impl std::error::Error for Error where E: std::error::Error {} -#[derive(Debug, Clone)] -pub struct ReadByIdxResult { - pub group: String, - pub name: String, - pub value: String, - pub fmt_type: String, -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum WriteSettingError { ValueRejected, @@ -405,31 +473,21 @@ pub enum WriteSettingError { Unknown, } -impl From for WriteSettingError { - fn from(n: u32) -> Self { - #[cfg(target_os = "windows")] - let n = n as i32; - - #[allow(non_upper_case_globals)] +impl From for WriteSettingError { + fn from(n: u8) -> Self { match n { - settings_write_res_e_SETTINGS_WR_VALUE_REJECTED => WriteSettingError::ValueRejected, - settings_write_res_e_SETTINGS_WR_SETTING_REJECTED => WriteSettingError::SettingRejected, - settings_write_res_e_SETTINGS_WR_PARSE_FAILED => WriteSettingError::ParseFailed, - settings_write_res_e_SETTINGS_WR_READ_ONLY => WriteSettingError::ReadOnly, - settings_write_res_e_SETTINGS_WR_MODIFY_DISABLED => WriteSettingError::ModifyDisabled, - settings_write_res_e_SETTINGS_WR_SERVICE_FAILED => WriteSettingError::ServiceFailed, - settings_write_res_e_SETTINGS_WR_TIMEOUT => WriteSettingError::Timeout, + 1 => WriteSettingError::ValueRejected, + 2 => WriteSettingError::SettingRejected, + 3 => WriteSettingError::ParseFailed, + 4 => WriteSettingError::ReadOnly, + 5 => WriteSettingError::ModifyDisabled, + 6 => WriteSettingError::ServiceFailed, + 7 => WriteSettingError::Timeout, _ => WriteSettingError::Unknown, } } } -impl From for WriteSettingError { - fn from(n: i32) -> Self { - (n as u32).into() - } -} - impl std::fmt::Display for WriteSettingError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -459,470 +517,265 @@ impl std::fmt::Display for WriteSettingError { impl std::error::Error for WriteSettingError {} -#[derive(Debug, Clone, Copy)] -pub struct ReadSettingError { - code: u32, -} - -impl From for ReadSettingError { - fn from(code: u32) -> Self { - Self { code } - } -} - -impl From for ReadSettingError { - fn from(code: i32) -> Self { - (code as u32).into() - } -} +#[derive(Debug)] +pub struct ReadSettingError {} impl std::fmt::Display for ReadSettingError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "settings read failed with status code {}", self.code) + write!(f, "settings read failed") } } impl std::error::Error for ReadSettingError {} -type SenderFunc = Box Result<(), Box>>; - -#[repr(C)] -struct Context<'a> { - link: Link<'a, ()>, - sender: SenderFunc, - callbacks: Vec, - event: Option, - lock: Lock, -} - -impl Context<'_> { - fn callback_broker(&self, msg: Sbp) { - let cb_data = { - let _guard = self.lock.lock(); - let idx = if let Some(idx) = self - .callbacks - .iter() - .position(|cb| cb.msg_type == msg.message_type()) - { - idx - } else { - error!( - "callback not registered for message type {}", - msg.message_type() - ); - return; - }; - self.callbacks[idx] - }; - let mut frame = match sbp::to_vec(&msg) { - Ok(frame) => frame, - Err(e) => { - error!("failed to frame sbp message {}", e); - return; - } - }; - let frame_len = frame.len(); - let payload = &mut frame[sbp::HEADER_LEN..frame_len - sbp::CRC_LEN]; - let cb = match cb_data.cb { - Some(cb) => cb, - None => { - error!("provided callback was none"); - return; - } - }; - let cb_context = cb_data.cb_context; - unsafe { - cb( - msg.sender_id().unwrap_or(0), - payload.len() as u8, - payload.as_mut_ptr(), - cb_context, - ) - }; - } -} - -unsafe impl Send for Context<'_> {} -unsafe impl Sync for Context<'_> {} - -#[repr(C)] -#[derive(Clone, Copy)] -struct Callback { - node: usize, - msg_type: u16, - cb: sbp_msg_callback_t, - cb_context: *mut c_void, - key: Key, -} - -#[repr(C)] -struct Event { - condvar: parking_lot::Condvar, - lock: parking_lot::Mutex<()>, -} - -impl Event { - fn new() -> Self { - Self { - condvar: parking_lot::Condvar::new(), - lock: parking_lot::Mutex::new(()), - } - } - - fn wait_timeout(&self, ms: i32) -> bool { - let mut started = self.lock.lock(); - let result = self - .condvar - .wait_for(&mut started, Duration::from_millis(ms as u64)); - !result.timed_out() - } - - fn set(&self) { - let _ = self.lock.lock(); - if !self.condvar.notify_one() { - warn!("event set did not notify anything"); - } - } -} - -#[repr(C)] -struct Lock(parking_lot::Mutex<()>); - -impl Lock { - fn new() -> Self { - Self(parking_lot::Mutex::new(())) - } - - fn lock(&self) -> parking_lot::MutexGuard<()> { - self.0.lock() - } - - fn acquire(&self) { - std::mem::forget(self.0.lock()); - } - - fn release(&self) { - // Safety: Only called via libsettings_unlock after libsettings_lock was called - unsafe { self.0.force_unlock() } - } -} - -struct CtxPtr(*mut c_void); - -unsafe impl Sync for CtxPtr {} -unsafe impl Send for CtxPtr {} - -#[no_mangle] -/// Thread safety: Should be called with lock already held -unsafe extern "C" fn libsettings_register_cb( - ctx: *mut c_void, - msg_type: u16, - cb: sbp_msg_callback_t, - cb_context: *mut c_void, - node: *mut *mut sbp_msg_callbacks_node_t, -) -> i32 { - let context: &mut Context = &mut *(ctx as *mut _); - let ctx_ptr = CtxPtr(ctx); - let key = context.link.register_by_id(&[msg_type], move |msg: Sbp| { - let context: &mut Context = &mut *(ctx_ptr.0 as *mut _); - context.callback_broker(msg) - }); - context.callbacks.push(Callback { - node: node as usize, - msg_type, - cb, - cb_context, - key, - }); - 0 -} - -#[no_mangle] -/// Thread safety: Should be called with lock already held -unsafe extern "C" fn libsettings_unregister_cb( - ctx: *mut c_void, - node: *mut *mut sbp_msg_callbacks_node_t, -) -> i32 { - let context: &mut Context = &mut *(ctx as *mut _); - if (node as i32) == 0 { - return -127; - } - let idx = match context - .callbacks - .iter() - .position(|cb| cb.node == node as usize) - { - Some(idx) => idx, - None => return -127, - }; - let key = context.callbacks.remove(idx).key; - context.link.unregister(key); - 0 -} +#[cfg(test)] +mod tests { + use std::io::{Read, Write}; + use std::time::Instant; -#[no_mangle] -unsafe extern "C" fn libsettings_send( - ctx: *mut c_void, - msg_type: u16, - len: u8, - payload: *mut u8, -) -> i32 { - libsettings_send_from(ctx, msg_type, len, payload, 0) -} + use super::*; -#[no_mangle] -unsafe extern "C" fn libsettings_send_from( - ctx: *mut ::std::os::raw::c_void, - msg_type: u16, - len: u8, - payload: *mut u8, - sender_id: u16, -) -> i32 { - let context: &mut Context = &mut *(ctx as *mut _); - let msg = match Sbp::from_frame(sbp::Frame { - msg_type, - sender_id, - payload: slice::from_raw_parts(payload, len as usize), - }) { - Ok(msg) => msg, - Err(err) => { - error!("error parsing message: {}", err); - return -1; - } - }; - if let Err(err) = (context.sender)(msg) { - error!("failed to send message: {}", err); - return -1; - }; - 0 -} + use crossbeam_utils::thread::scope; + use sbp::messages::settings::{MsgSettingsReadReq, MsgSettingsReadResp}; + use sbp::{SbpMessage, SbpString}; -#[no_mangle] -extern "C" fn libsettings_wait(ctx: *mut c_void, timeout_ms: i32) -> i32 { - assert!(timeout_ms > 0); - let context: &mut Context = unsafe { &mut *(ctx as *mut _) }; - if context.event.as_ref().unwrap().wait_timeout(timeout_ms) { - 0 - } else { - -1 + #[test] + fn test_should_retry() { + let (group, name) = ("sbp", "obs_msg_max_size"); + let mut mock = Mock::with_errors(5); + mock.req_reply( + MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0", group, name).into(), + }, + MsgSettingsReadResp { + sender_id: Some(0x42), + setting: format!("{}\0{}\010\0", group, name).into(), + }, + ); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let response = client.read_setting(group, name).unwrap().unwrap(); + assert!(matches!(response.value, Some(SettingValue::Integer(10)))); } -} -#[no_mangle] -extern "C" fn libsettings_wait_thd(event: *mut c_void, timeout_ms: i32) -> i32 { - assert!(timeout_ms > 0); - let event: &Event = unsafe { &*(event as *const _) }; - if event.wait_timeout(timeout_ms) { - 0 - } else { - -1 + #[test] + fn read_setting_timeout() { + let (group, name) = ("sbp", "obs_msg_max_size"); + let mock = Mock::new(); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let (ctx, _ctx_handle) = Context::with_timeout(Duration::from_millis(100)); + let now = Instant::now(); + let mut response1 = Ok(None); + let mut response2 = Ok(None); + scope(|scope| { + scope.spawn({ + let ctx = ctx.clone(); + |_| { + response1 = client.read_setting_ctx(group, name, ctx); + } + }); + scope.spawn(|_| { + response2 = client.read_setting_ctx(group, name, ctx); + }); + }) + .unwrap(); + assert!(now.elapsed().as_millis() >= 100); + assert!(matches!(response1, Err(Error::TimedOut))); + assert!(matches!(response2, Err(Error::TimedOut))); } -} - -#[no_mangle] -extern "C" fn libsettings_signal_thd(event: *mut c_void) { - let event: &Event = unsafe { &*(event as *const _) }; - event.set(); -} - -#[no_mangle] -extern "C" fn libsettings_signal(ctx: *mut c_void) { - let context: &Context = unsafe { &*(ctx as *const _) }; - context.event.as_ref().unwrap().set(); -} - -#[no_mangle] -extern "C" fn libsettings_lock(ctx: *mut c_void) { - let context: &Context = unsafe { &*(ctx as *const _) }; - context.lock.acquire(); -} -#[no_mangle] -extern "C" fn libsettings_unlock(ctx: *mut c_void) { - let context: &Context = unsafe { &*(ctx as *const _) }; - context.lock.release(); -} - -#[no_mangle] -extern "C" fn libsettings_wait_init(ctx: *mut c_void) -> i32 { - let context: &mut Context = unsafe { &mut *(ctx as *mut _) }; - context.event = Some(Event::new()); - 0 -} - -#[no_mangle] -extern "C" fn libsettings_log_wrapper( - priority: ::std::os::raw::c_int, - format: *const ::std::os::raw::c_char, -) { - let level = match priority { - i32::MIN..=3 => log::Level::Error, - 4 => log::Level::Warn, - 5 => log::Level::Info, - 6 => log::Level::Info, - 7.. => log::Level::Debug, - }; - let msg = unsafe { CStr::from_ptr(format) }; - let msg = msg.to_string_lossy(); - log::log!(level, "{}", msg); -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::io::{Read, Write}; - - use crossbeam_utils::thread::scope; - use sbp::link::LinkSource; - use sbp::messages::settings::{MsgSettingsReadReq, MsgSettingsReadResp}; - use sbp::{SbpIterExt, SbpString}; - - static SETTINGS_SENDER_ID: u16 = 0x42; - - fn read_setting( - rdr: impl Read + Send, - mut wtr: impl Write + 'static, - group: &str, - name: &str, - ) -> Option>> { - scope(move |scope| { - let source = LinkSource::new(); - let link = source.link(); - scope.spawn(move |_| { - let messages = sbp::iter_messages(rdr).handle_errors(|e| panic!("{}", e)); - for message in messages { - source.send(message); + #[test] + fn read_setting_cancel() { + let (group, name) = ("sbp", "obs_msg_max_size"); + let mock = Mock::new(); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let (ctx, ctx_handle) = Context::new(); + let now = Instant::now(); + let mut response1 = Ok(None); + let mut response2 = Ok(None); + scope(|scope| { + scope.spawn({ + let ctx = ctx.clone(); + |_| { + response1 = client.read_setting_ctx(group, name, ctx); } }); - let client = Client::new(link, move |msg| { - sbp::to_writer(&mut wtr, &msg).map_err(Into::into) + scope.spawn(|_| { + response2 = client.read_setting_ctx(group, name, ctx); }); - client.read_setting(group, name) + std::thread::sleep(Duration::from_millis(100)); + ctx_handle.cancel(); }) - .unwrap() + .unwrap(); + assert!(now.elapsed().as_millis() >= 100); + assert!(matches!(response1, Err(Error::Canceled))); + assert!(matches!(response2, Err(Error::Canceled))); } #[test] fn mock_read_setting_int() { let (group, name) = ("sbp", "obs_msg_max_size"); - let mut stream = mockstream::SyncMockStream::new(); - - let request_msg = MsgSettingsReadReq { - sender_id: Some(SETTINGS_SENDER_ID), - setting: SbpString::from(format!("{}\0{}\0", group, name)), - }; - - stream.wait_for(sbp::to_vec(&request_msg).unwrap().as_ref()); - - let reply_msg = MsgSettingsReadResp { - sender_id: Some(0x42), - setting: SbpString::from(format!("{}\0{}\010\0", group, name)), - }; - - stream.push_bytes_to_read(sbp::to_vec(&reply_msg).unwrap().as_ref()); - - let response = read_setting(stream.clone(), stream, group, name); - - assert!(matches!(response, Some(Ok(SettingValue::Integer(10))))); + let mut mock = Mock::new(); + mock.req_reply( + MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0", group, name).into(), + }, + MsgSettingsReadResp { + sender_id: Some(0x42), + setting: format!("{}\0{}\010\0", group, name).into(), + }, + ); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let response = client.read_setting(group, name).unwrap().unwrap(); + assert!(matches!(response.value, Some(SettingValue::Integer(10)))); } #[test] fn mock_read_setting_bool() { let (group, name) = ("surveyed_position", "broadcast"); - let mut stream = mockstream::SyncMockStream::new(); - - let request_msg = MsgSettingsReadReq { - sender_id: Some(SETTINGS_SENDER_ID), - setting: SbpString::from(format!("{}\0{}\0", group, name)), - }; - - stream.wait_for(sbp::to_vec(&request_msg).unwrap().as_ref()); - - let reply_msg = MsgSettingsReadResp { - sender_id: Some(0x42), - setting: SbpString::from(format!("{}\0{}\0True\0", group, name)), - }; - - stream.push_bytes_to_read(sbp::to_vec(&reply_msg).unwrap().as_ref()); - - let response = read_setting(stream.clone(), stream, group, name); - - assert!(matches!(response, Some(Ok(SettingValue::Boolean(true))))); + let mut mock = Mock::new(); + mock.req_reply( + MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0", group, name).into(), + }, + MsgSettingsReadResp { + sender_id: Some(0x42), + setting: SbpString::from(format!("{}\0{}\0True\0", group, name)), + }, + ); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let response = client.read_setting(group, name).unwrap().unwrap(); + assert!(matches!(response.value, Some(SettingValue::Boolean(true)))); } #[test] fn mock_read_setting_double() { let (group, name) = ("surveyed_position", "surveyed_lat"); - let mut stream = mockstream::SyncMockStream::new(); - - let request_msg = MsgSettingsReadReq { - sender_id: Some(SETTINGS_SENDER_ID), - setting: SbpString::from(format!("{}\0{}\0", group, name)), - }; - - stream.wait_for(sbp::to_vec(&request_msg).unwrap().as_ref()); - - let reply_msg = MsgSettingsReadResp { - sender_id: Some(SETTINGS_SENDER_ID), - setting: SbpString::from(format!("{}\0{}\00.1\0", group, name)), - }; - - stream.push_bytes_to_read(sbp::to_vec(&reply_msg).unwrap().as_ref()); - - let response = read_setting(stream.clone(), stream, group, name) - .unwrap() - .unwrap(); - assert_eq!(response, SettingValue::Float(0.1)); + let mut mock = Mock::new(); + mock.req_reply( + MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: SbpString::from(format!("{}\0{}\0", group, name)), + }, + MsgSettingsReadResp { + sender_id: Some(SENDER_ID), + setting: SbpString::from(format!("{}\0{}\00.1\0", group, name)), + }, + ); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let response = client.read_setting(group, name).unwrap().unwrap(); + assert_eq!(response.value, Some(SettingValue::Float(0.1))); } #[test] fn mock_read_setting_string() { let (group, name) = ("rtcm_out", "ant_descriptor"); - let mut stream = mockstream::SyncMockStream::new(); - - let request_msg = MsgSettingsReadReq { - sender_id: Some(0x42), - setting: SbpString::from(format!("{}\0{}\0", group, name)), - }; - - stream.wait_for(sbp::to_vec(&request_msg).unwrap().as_ref()); - - let reply_msg = MsgSettingsReadResp { - sender_id: Some(SETTINGS_SENDER_ID), - setting: SbpString::from(format!("{}\0{}\0foo\0", group, name)), - }; - - stream.push_bytes_to_read(sbp::to_vec(&reply_msg).unwrap().as_ref()); - - let response = read_setting(stream.clone(), stream, group, name) - .unwrap() - .unwrap(); - assert_eq!(response, SettingValue::String("foo".into())); + let mut mock = Mock::new(); + mock.req_reply( + MsgSettingsReadReq { + sender_id: Some(0x42), + setting: SbpString::from(format!("{}\0{}\0", group, name)), + }, + MsgSettingsReadResp { + sender_id: Some(SENDER_ID), + setting: SbpString::from(format!("{}\0{}\0foo\0", group, name)), + }, + ); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let response = client.read_setting(group, name).unwrap().unwrap(); + assert_eq!(response.value, Some(SettingValue::String("foo".into()))); } #[test] fn mock_read_setting_enum() { let (group, name) = ("frontend", "antenna_selection"); - let mut stream = mockstream::SyncMockStream::new(); + let mut mock = Mock::new(); + mock.req_reply( + MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: SbpString::from(format!("{}\0{}\0", group, name)), + }, + MsgSettingsReadResp { + sender_id: Some(0x42), + setting: SbpString::from(format!("{}\0{}\0Secondary\0", group, name)), + }, + ); + let (reader, writer) = mock.into_io(); + let client = Client::new(reader, writer); + let response = client.read_setting(group, name).unwrap().unwrap(); + assert_eq!( + response.value, + Some(SettingValue::String("Secondary".into())) + ); + } + + #[derive(Clone)] + struct Mock { + stream: mockstream::SyncMockStream, + write_errors: u16, + } + + impl Mock { + fn new() -> Self { + Self { + stream: mockstream::SyncMockStream::new(), + write_errors: 0, + } + } - let request_msg = MsgSettingsReadReq { - sender_id: Some(SETTINGS_SENDER_ID), - setting: SbpString::from(format!("{}\0{}\0", group, name)), - }; + fn with_errors(write_errors: u16) -> Self { + Self { + stream: mockstream::SyncMockStream::new(), + write_errors, + } + } - stream.wait_for(sbp::to_vec(&request_msg).unwrap().as_ref()); + fn req_reply(&mut self, req: impl SbpMessage, res: impl SbpMessage) { + self.reqs_reply(&[req], res) + } - let reply_msg = MsgSettingsReadResp { - sender_id: Some(0x42), - setting: SbpString::from(format!("{}\0{}\0Secondary\0", group, name)), - }; + fn reqs_reply(&mut self, reqs: &[impl SbpMessage], res: impl SbpMessage) { + let bytes: Vec<_> = reqs + .iter() + .flat_map(|req| sbp::to_vec(req).unwrap()) + .collect(); + self.stream.wait_for(bytes.as_ref()); + let bytes = sbp::to_vec(&res).unwrap(); + self.stream.push_bytes_to_read(bytes.as_ref()); + } + + fn into_io(self) -> (impl io::Read, impl io::Write) { + (self.clone(), self) + } + } - stream.push_bytes_to_read(sbp::to_vec(&reply_msg).unwrap().as_ref()); + impl Read for Mock { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.stream.read(buf) + } + } - let response = read_setting(stream.clone(), stream, group, name) - .unwrap() - .unwrap(); - assert_eq!(response, SettingValue::String("Secondary".into())); + impl Write for Mock { + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.write_errors > 0 { + self.write_errors -= 1; + Err(io::Error::new(io::ErrorKind::Other, "error")) + } else { + self.stream.write(buf) + } + } + + fn flush(&mut self) -> io::Result<()> { + self.stream.flush() + } } } diff --git a/src/lib.rs b/src/lib.rs index b5f21cf..bed6f52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,5 @@ mod client; pub mod settings; -pub use client::{Client, Error, ReadSettingError, WriteSettingError}; +pub use client::{Client, Context, Error, ReadResp, ReadSettingError, WriteSettingError}; pub use settings::{Setting, SettingKind, SettingValue}; - -pub(crate) mod bindings { - #![allow(non_upper_case_globals)] - #![allow(non_camel_case_types)] - #![allow(non_snake_case)] - #![allow(deref_nullptr)] - #![allow(dead_code)] - include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -} diff --git a/src/settings.rs b/src/settings.rs index 84e489d..1abbd93 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -23,7 +23,7 @@ pub fn load(settings: Vec) -> Result<(), Vec> { SETTINGS.set(settings) } -#[derive(Debug, Clone, PartialEq, Deserialize)] +#[derive(Debug, Default, Clone, PartialEq, Deserialize)] pub struct Setting { pub name: String, @@ -103,6 +103,12 @@ pub enum SettingKind { PackedBitfield, } +impl Default for SettingKind { + fn default() -> Self { + SettingKind::String + } +} + impl SettingKind { pub fn to_str(&self) -> &'static str { match self { @@ -125,6 +131,21 @@ pub enum SettingValue { String(String), } +impl SettingValue { + pub fn parse(v: &str, kind: SettingKind) -> Option { + match kind { + SettingKind::Integer => v.parse().ok().map(SettingValue::Integer), + SettingKind::Boolean if v == "True" => Some(SettingValue::Boolean(true)), + SettingKind::Boolean if v == "False" => Some(SettingValue::Boolean(false)), + SettingKind::Float | SettingKind::Double => v.parse().ok().map(SettingValue::Float), + SettingKind::String | SettingKind::Enum | SettingKind::PackedBitfield => { + Some(SettingValue::String(v.to_owned())) + } + _ => None, + } + } +} + impl fmt::Display for SettingValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { From 59d8e46fb331057aba38d3cfb976f5b2ab0c0a8a Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Mon, 22 Nov 2021 19:16:41 -0800 Subject: [PATCH 02/13] cleanup --- src/client.rs | 258 ++++++++++++++++++++++++++------------------------ 1 file changed, 136 insertions(+), 122 deletions(-) diff --git a/src/client.rs b/src/client.rs index 86ca37d..719bb55 100644 --- a/src/client.rs +++ b/src/client.rs @@ -14,7 +14,7 @@ use crossbeam_utils::thread; use log::warn; use parking_lot::Mutex; use sbp::{ - link::{Link, LinkSource}, + link::{Key, Link, LinkSource}, messages::settings::{ MsgSettingsReadByIndexDone, MsgSettingsReadByIndexReq, MsgSettingsReadByIndexResp, MsgSettingsReadReq, MsgSettingsReadResp, MsgSettingsWrite, MsgSettingsWriteResp, @@ -31,6 +31,14 @@ pub struct Client<'a> { link: Link<'a, ()>, sender: MsgSender, handle: Option>, + done_rx: Receiver<()>, + key: Key, +} + +impl Drop for Client<'_> { + fn drop(&mut self) { + self.link.unregister(self.key); + } } impl<'a> Client<'a> { @@ -56,10 +64,14 @@ impl<'a> Client<'a> { where F: FnMut(Sbp) -> Result<(), BoxedError> + Send + 'static, { + let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); + let key = link.register(on_read_by_index_done(done_tx)); Self { link, sender: MsgSender(Arc::new(Mutex::new(Box::new(sender)))), handle: None, + done_rx, + key, } } @@ -111,15 +123,6 @@ impl<'a> Client<'a> { } fn read_all_inner(&self, ctx: Context) -> (Vec, Vec>) { - let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); - let key = self.link.register(move |_: MsgSettingsReadByIndexDone| { - if !done_tx.is_empty() { - return; - } - for _ in 0..NUM_WORKERS { - done_tx.try_send(()).unwrap(); - } - }); let (settings, errors) = (Mutex::new(Vec::new()), Mutex::new(Vec::new())); let idx = AtomicU16::new(0); thread::scope(|scope| { @@ -127,15 +130,26 @@ impl<'a> Client<'a> { let idx = &idx; let settings = &settings; let errors = &errors; - let done_rx = &done_rx; let ctx = ctx.clone(); - scope.spawn(move |_| { - self.worker_thd(idx, settings, errors, done_rx, ctx); + scope.spawn(move |_| loop { + let idx = idx.fetch_add(1, Ordering::SeqCst); + match self.read_by_index(idx, &ctx) { + Ok(Some(setting)) => { + settings.lock().push((idx, setting)); + } + Ok(None) => break, + Err(err) => { + let exit = matches!(err, Error::TimedOut | Error::Canceled); + errors.lock().push((idx, err)); + if exit { + break; + } + } + } }); } }) .expect("read_all worker thread panicked"); - self.link.unregister(key); settings.lock().sort_by_key(|(idx, _)| *idx); errors.lock().sort_by_key(|(idx, _)| *idx); ( @@ -144,97 +158,20 @@ impl<'a> Client<'a> { ) } - fn worker_thd( - &self, - idx: &AtomicU16, - settings: &Mutex>, - errors: &Mutex)>>, - done_rx: &Receiver<()>, - ctx: Context, - ) { - loop { - let idx = idx.fetch_add(1, Ordering::SeqCst); - match self.read_by_index(idx, done_rx, &ctx) { - Ok(Some(setting)) => { - settings.lock().push((idx, setting)); - } - Ok(None) => { - break; - } - Err(err) => { - let exit = matches!(err, Error::TimedOut | Error::Canceled); - errors.lock().push((idx, err)); - if exit { - break; - } - } - } - } - } - fn read_by_index( &self, idx: u16, - done_rx: &Receiver<()>, ctx: &Context, ) -> Result, Error> { let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| { - if idx != msg.index { - return; - } - let setting = msg.setting.to_string(); - let mut fields = setting.split('\0'); - let group = fields.next().unwrap_or_default(); - let name = fields.next().unwrap_or_default(); - let entry = Setting::find(group, name); - if let Some(entry) = entry { - let value = fields - .next() - .and_then(|s| SettingValue::parse(s, entry.kind)); - let fmt_type = fields.next(); - if entry.kind == SettingKind::Enum { - if let Some(fmt_type) = fmt_type { - let mut parts = fmt_type.splitn(2, ':'); - let possible_values = parts.nth(1); - if let Some(p) = possible_values { - let mut entry = entry.clone(); - entry.enumerated_possible_values = Some(p.to_owned()); - let _ = tx.send(ReadResp { - entry: Cow::Owned(entry), - value, - }); - return; - } - } - } - let _ = tx.send(ReadResp { - entry: Cow::Borrowed(entry), - value, - }); - } else { - warn!( - "No settings documentation entry or name: {} in group: {}", - name, group - ); - let entry: Cow = Cow::Owned(Setting { - group: group.to_owned(), - name: name.to_owned(), - ..Default::default() - }); - let value = fields - .next() - .and_then(|s| SettingValue::parse(s, entry.kind)); - let _ = tx.send(ReadResp { entry, value }); - } - }); + let key = self.link.register(on_read_by_index_resp(idx, tx)); self.sender.send(MsgSettingsReadByIndexReq { sender_id: Some(SENDER_ID), index: idx, })?; let res = crossbeam_channel::select! { recv(rx) -> msg => Ok(Some(msg.unwrap())), - recv(done_rx) -> _ => Ok(None), + recv(self.done_rx) -> _ => Ok(None), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), }; @@ -257,23 +194,7 @@ impl<'a> Client<'a> { setting: format!("{}\0{}\0", group, name).into(), }; let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(move |msg: MsgSettingsReadResp| { - let setting = msg.setting.to_string(); - let mut fields = setting.split('\0'); - if fields.next() == Some(&group) && fields.next() == Some(&name) { - match fields.next() { - Some(value) => { - let _ = tx.try_send(Some(ReadResp { - entry: Cow::Borrowed(entry), - value: SettingValue::parse(value, entry.kind), - })); - } - None => { - let _ = tx.try_send(None); - } - } - } - }); + let key = self.link.register(on_read_resp(group, name, tx, entry)); self.sender.send(read_req)?; let res = crossbeam_channel::select! { recv(rx) -> msg => Ok(msg.unwrap()), @@ -296,17 +217,7 @@ impl<'a> Client<'a> { setting: format!("{}\0{}\0{}\0", group, name, value).into(), }; let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(move |msg: MsgSettingsWriteResp| { - let setting = msg.setting.to_string(); - let mut fields = setting.split('\0'); - if fields.next() == Some(&group) && fields.next() == Some(&name) { - if msg.status == 0 { - let _ = tx.send(Ok(())); - } else { - let _ = tx.send(Err(Error::Err(WriteSettingError::from(msg.status)))); - } - } - }); + let key = self.link.register(on_write_resp(group, name, tx)); self.sender.send(write_req)?; let res = crossbeam_channel::select! { recv(rx) -> msg => msg.unwrap(), @@ -318,6 +229,109 @@ impl<'a> Client<'a> { } } +fn on_read_by_index_done(done_tx: Sender<()>) -> impl FnMut(MsgSettingsReadByIndexDone) { + move |_: MsgSettingsReadByIndexDone| { + for _ in 0..NUM_WORKERS { + let _ = done_tx.try_send(()); + } + } +} + +fn on_write_resp( + group: String, + name: String, + tx: Sender>>, +) -> impl FnMut(MsgSettingsWriteResp) { + move |msg: MsgSettingsWriteResp| { + let setting = msg.setting.to_string(); + let mut fields = setting.split('\0'); + if fields.next() == Some(&group) && fields.next() == Some(&name) { + if msg.status == 0 { + let _ = tx.send(Ok(())); + } else { + let _ = tx.send(Err(Error::Err(WriteSettingError::from(msg.status)))); + } + } + } +} + +fn on_read_resp( + group: String, + name: String, + tx: Sender>, + entry: &'static Setting, +) -> impl FnMut(MsgSettingsReadResp) { + move |msg: MsgSettingsReadResp| { + let setting = msg.setting.to_string(); + let mut fields = setting.split('\0'); + if fields.next() == Some(&group) && fields.next() == Some(&name) { + match fields.next() { + Some(value) => { + let _ = tx.try_send(Some(ReadResp { + entry: Cow::Borrowed(entry), + value: SettingValue::parse(value, entry.kind), + })); + } + None => { + let _ = tx.try_send(None); + } + } + } + } +} + +fn on_read_by_index_resp(idx: u16, tx: Sender) -> impl FnMut(MsgSettingsReadByIndexResp) { + move |msg: MsgSettingsReadByIndexResp| { + if idx != msg.index { + return; + } + let setting = msg.setting.to_string(); + let mut fields = setting.split('\0'); + let group = fields.next().unwrap_or_default(); + let name = fields.next().unwrap_or_default(); + let entry = Setting::find(group, name); + if let Some(entry) = entry { + let value = fields + .next() + .and_then(|s| SettingValue::parse(s, entry.kind)); + let fmt_type = fields.next(); + if entry.kind == SettingKind::Enum { + if let Some(fmt_type) = fmt_type { + let mut parts = fmt_type.splitn(2, ':'); + let possible_values = parts.nth(1); + if let Some(p) = possible_values { + let mut entry = entry.clone(); + entry.enumerated_possible_values = Some(p.to_owned()); + let _ = tx.send(ReadResp { + entry: Cow::Owned(entry), + value, + }); + return; + } + } + } + let _ = tx.send(ReadResp { + entry: Cow::Borrowed(entry), + value, + }); + } else { + warn!( + "No settings documentation entry or name: {} in group: {}", + name, group + ); + let entry: Cow = Cow::Owned(Setting { + group: group.to_owned(), + name: name.to_owned(), + ..Default::default() + }); + let value = fields + .next() + .and_then(|s| SettingValue::parse(s, entry.kind)); + let _ = tx.send(ReadResp { entry, value }); + } + } +} + #[derive(Debug, Clone, PartialEq)] pub struct ReadResp { pub entry: Cow<'static, Setting>, @@ -414,7 +428,7 @@ struct MsgSender(Arc>); impl MsgSender { const RETRIES: usize = 5; - const TIMEOUT: Duration = Duration::from_millis(500); + const TIMEOUT: Duration = Duration::from_millis(100); fn send(&self, msg: impl Into) -> Result<(), BoxedError> { self.send_inner(msg.into(), 0) From cd97ab6e692892dfd9e1d2a2a6a09b42be3da2c9 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Mon, 22 Nov 2021 20:22:16 -0800 Subject: [PATCH 03/13] cleanup --- src/client.rs | 111 +++++++++++++++++++----------------------------- src/settings.rs | 13 ++++++ 2 files changed, 56 insertions(+), 68 deletions(-) diff --git a/src/client.rs b/src/client.rs index 719bb55..d484237 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,6 @@ use std::{ use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::thread; -use log::warn; use parking_lot::Mutex; use sbp::{ link::{Key, Link, LinkSource}, @@ -32,12 +31,12 @@ pub struct Client<'a> { sender: MsgSender, handle: Option>, done_rx: Receiver<()>, - key: Key, + done_key: Key, } impl Drop for Client<'_> { fn drop(&mut self) { - self.link.unregister(self.key); + self.link.unregister(self.done_key); } } @@ -65,13 +64,13 @@ impl<'a> Client<'a> { F: FnMut(Sbp) -> Result<(), BoxedError> + Send + 'static, { let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); - let key = link.register(on_read_by_index_done(done_tx)); + let done_key = link.register(on_read_by_index_done(done_tx)); Self { link, sender: MsgSender(Arc::new(Mutex::new(Box::new(sender)))), handle: None, done_rx, - key, + done_key, } } @@ -164,7 +163,7 @@ impl<'a> Client<'a> { ctx: &Context, ) -> Result, Error> { let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(on_read_by_index_resp(idx, tx)); + let key = self.link.register(on_read_by_index_resp(tx, idx)); self.sender.send(MsgSettingsReadByIndexReq { sender_id: Some(SENDER_ID), index: idx, @@ -185,17 +184,14 @@ impl<'a> Client<'a> { name: String, ctx: Context, ) -> Result, Error> { - let entry = match Setting::find(&group, &name) { - Some(setting) => setting, - None => return Ok(None), - }; - let read_req = MsgSettingsReadReq { + let (tx, rx) = crossbeam_channel::bounded(1); + let key = self + .link + .register(on_read_resp(tx, group.clone(), name.clone())); + self.sender.send(MsgSettingsReadReq { sender_id: Some(SENDER_ID), setting: format!("{}\0{}\0", group, name).into(), - }; - let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(on_read_resp(group, name, tx, entry)); - self.sender.send(read_req)?; + })?; let res = crossbeam_channel::select! { recv(rx) -> msg => Ok(msg.unwrap()), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), @@ -212,13 +208,14 @@ impl<'a> Client<'a> { value: String, ctx: Context, ) -> Result<(), Error> { - let write_req = MsgSettingsWrite { + let (tx, rx) = crossbeam_channel::bounded(1); + let key = self + .link + .register(on_write_resp(tx, group.clone(), name.clone())); + self.sender.send(MsgSettingsWrite { sender_id: Some(SENDER_ID), setting: format!("{}\0{}\0{}\0", group, name, value).into(), - }; - let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(on_write_resp(group, name, tx)); - self.sender.send(write_req)?; + })?; let res = crossbeam_channel::select! { recv(rx) -> msg => msg.unwrap(), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), @@ -229,18 +226,18 @@ impl<'a> Client<'a> { } } -fn on_read_by_index_done(done_tx: Sender<()>) -> impl FnMut(MsgSettingsReadByIndexDone) { +fn on_read_by_index_done(tx: Sender<()>) -> impl FnMut(MsgSettingsReadByIndexDone) { move |_: MsgSettingsReadByIndexDone| { for _ in 0..NUM_WORKERS { - let _ = done_tx.try_send(()); + let _ = tx.try_send(()); } } } fn on_write_resp( + tx: Sender>>, group: String, name: String, - tx: Sender>>, ) -> impl FnMut(MsgSettingsWriteResp) { move |msg: MsgSettingsWriteResp| { let setting = msg.setting.to_string(); @@ -256,21 +253,22 @@ fn on_write_resp( } fn on_read_resp( + tx: Sender>, group: String, name: String, - tx: Sender>, - entry: &'static Setting, ) -> impl FnMut(MsgSettingsReadResp) { move |msg: MsgSettingsReadResp| { let setting = msg.setting.to_string(); let mut fields = setting.split('\0'); if fields.next() == Some(&group) && fields.next() == Some(&name) { + let entry = Setting::find(&group, &name).map_or_else( + || Cow::Owned(Setting::unknown(group.clone(), name.clone())), + Cow::Borrowed, + ); match fields.next() { - Some(value) => { - let _ = tx.try_send(Some(ReadResp { - entry: Cow::Borrowed(entry), - value: SettingValue::parse(value, entry.kind), - })); + Some(v) => { + let value = SettingValue::parse(v, entry.kind); + let _ = tx.try_send(Some(ReadResp { entry, value })); } None => { let _ = tx.try_send(None); @@ -280,7 +278,7 @@ fn on_read_resp( } } -fn on_read_by_index_resp(idx: u16, tx: Sender) -> impl FnMut(MsgSettingsReadByIndexResp) { +fn on_read_by_index_resp(tx: Sender, idx: u16) -> impl FnMut(MsgSettingsReadByIndexResp) { move |msg: MsgSettingsReadByIndexResp| { if idx != msg.index { return; @@ -289,46 +287,23 @@ fn on_read_by_index_resp(idx: u16, tx: Sender) -> impl FnMut(MsgSettin let mut fields = setting.split('\0'); let group = fields.next().unwrap_or_default(); let name = fields.next().unwrap_or_default(); - let entry = Setting::find(group, name); - if let Some(entry) = entry { - let value = fields - .next() - .and_then(|s| SettingValue::parse(s, entry.kind)); - let fmt_type = fields.next(); - if entry.kind == SettingKind::Enum { - if let Some(fmt_type) = fmt_type { - let mut parts = fmt_type.splitn(2, ':'); - let possible_values = parts.nth(1); - if let Some(p) = possible_values { - let mut entry = entry.clone(); - entry.enumerated_possible_values = Some(p.to_owned()); - let _ = tx.send(ReadResp { - entry: Cow::Owned(entry), - value, - }); - return; - } + let mut entry = Setting::find(&group, &name).map_or_else( + || Cow::Owned(Setting::unknown(group.to_owned(), name.to_owned())), + Cow::Borrowed, + ); + let value = fields + .next() + .and_then(|s| SettingValue::parse(s, entry.kind)); + if entry.kind == SettingKind::Enum { + if let Some(fmt_type) = fields.next() { + let mut parts = fmt_type.splitn(2, ':'); + let possible_values = parts.nth(1); + if let Some(p) = possible_values { + entry.to_mut().enumerated_possible_values = Some(p.to_owned()); } } - let _ = tx.send(ReadResp { - entry: Cow::Borrowed(entry), - value, - }); - } else { - warn!( - "No settings documentation entry or name: {} in group: {}", - name, group - ); - let entry: Cow = Cow::Owned(Setting { - group: group.to_owned(), - name: name.to_owned(), - ..Default::default() - }); - let value = fields - .next() - .and_then(|s| SettingValue::parse(s, entry.kind)); - let _ = tx.send(ReadResp { entry, value }); } + let _ = tx.try_send(ReadResp { entry, value }); } } diff --git a/src/settings.rs b/src/settings.rs index 1abbd93..02f5263 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,5 +1,6 @@ use std::{fmt, fs, io, path::Path}; +use log::warn; use once_cell::sync::OnceCell; use serde::{ de::{self, Unexpected}, @@ -77,6 +78,18 @@ impl Setting { .iter() .find(|s| s.group == group && s.name == name) } + + pub(crate) fn unknown(group: String, name: String) -> Setting { + warn!( + "No settings documentation entry or name: {} in group: {}", + name, group + ); + Setting { + group, + name, + ..Default::default() + } + } } #[derive(Debug, Clone, Copy, PartialEq, Deserialize)] From a389ee55d36a51a43931da153f2b864498db7ee1 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Tue, 23 Nov 2021 13:31:19 -0800 Subject: [PATCH 04/13] cleanup --- src/client.rs | 266 +++++++++++++++++++++++++----------------------- src/lib.rs | 2 +- src/settings.rs | 43 ++++++-- 3 files changed, 172 insertions(+), 139 deletions(-) diff --git a/src/client.rs b/src/client.rs index d484237..791b2d8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,6 @@ use std::{ borrow::Cow, + convert::TryFrom, io, sync::{ atomic::{AtomicU16, AtomicUsize, Ordering}, @@ -13,15 +14,16 @@ use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::thread; use parking_lot::Mutex; use sbp::{ - link::{Key, Link, LinkSource}, + link::{Link, LinkSource}, messages::settings::{ MsgSettingsReadByIndexDone, MsgSettingsReadByIndexReq, MsgSettingsReadByIndexResp, MsgSettingsReadReq, MsgSettingsReadResp, MsgSettingsWrite, MsgSettingsWriteResp, }, - Sbp, SbpIterExt, + sbp_string::Multipart, + Sbp, SbpIterExt, SbpString, }; -use crate::{Setting, SettingKind, SettingValue}; +use crate::{Setting, SettingValue}; const SENDER_ID: u16 = 0x42; const NUM_WORKERS: usize = 10; @@ -30,14 +32,6 @@ pub struct Client<'a> { link: Link<'a, ()>, sender: MsgSender, handle: Option>, - done_rx: Receiver<()>, - done_key: Key, -} - -impl Drop for Client<'_> { - fn drop(&mut self) { - self.link.unregister(self.done_key); - } } impl<'a> Client<'a> { @@ -63,14 +57,10 @@ impl<'a> Client<'a> { where F: FnMut(Sbp) -> Result<(), BoxedError> + Send + 'static, { - let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); - let done_key = link.register(on_read_by_index_done(done_tx)); Self { link, sender: MsgSender(Arc::new(Mutex::new(Box::new(sender)))), handle: None, - done_rx, - done_key, } } @@ -79,7 +69,7 @@ impl<'a> Client<'a> { group: impl Into, name: impl Into, value: impl Into, - ) -> Result<(), Error> { + ) -> Result { let (ctx, _ctx_handle) = Context::new(); self.write_setting_ctx(group, name, value, ctx) } @@ -90,7 +80,7 @@ impl<'a> Client<'a> { name: impl Into, value: impl Into, ctx: Context, - ) -> Result<(), Error> { + ) -> Result { self.write_setting_inner(group.into(), name.into(), value.into(), ctx) } @@ -98,7 +88,7 @@ impl<'a> Client<'a> { &self, group: impl Into, name: impl Into, - ) -> Result, Error> { + ) -> Result, Error> { let (ctx, _ctx_handle) = Context::new(); self.read_setting_ctx(group, name, ctx) } @@ -108,20 +98,26 @@ impl<'a> Client<'a> { group: impl Into, name: impl Into, ctx: Context, - ) -> Result, Error> { + ) -> Result, Error> { self.read_setting_inner(group.into(), name.into(), ctx) } - pub fn read_all(&self) -> (Vec, Vec>) { + pub fn read_all(&self) -> (Vec, Vec) { let (ctx, _ctx_handle) = Context::new(); self.read_all_ctx(ctx) } - pub fn read_all_ctx(&self, ctx: Context) -> (Vec, Vec>) { + pub fn read_all_ctx(&self, ctx: Context) -> (Vec, Vec) { self.read_all_inner(ctx) } - fn read_all_inner(&self, ctx: Context) -> (Vec, Vec>) { + fn read_all_inner(&self, ctx: Context) -> (Vec, Vec) { + let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); + let done_key = self.link.register(move |_: MsgSettingsReadByIndexDone| { + for _ in 0..NUM_WORKERS { + let _ = done_tx.try_send(()); + } + }); let (settings, errors) = (Mutex::new(Vec::new()), Mutex::new(Vec::new())); let idx = AtomicU16::new(0); thread::scope(|scope| { @@ -129,10 +125,11 @@ impl<'a> Client<'a> { let idx = &idx; let settings = &settings; let errors = &errors; + let done_rx = &done_rx; let ctx = ctx.clone(); scope.spawn(move |_| loop { let idx = idx.fetch_add(1, Ordering::SeqCst); - match self.read_by_index(idx, &ctx) { + match self.read_by_index(idx, done_rx, &ctx) { Ok(Some(setting)) => { settings.lock().push((idx, setting)); } @@ -149,6 +146,7 @@ impl<'a> Client<'a> { } }) .expect("read_all worker thread panicked"); + self.link.unregister(done_key); settings.lock().sort_by_key(|(idx, _)| *idx); errors.lock().sort_by_key(|(idx, _)| *idx); ( @@ -159,18 +157,24 @@ impl<'a> Client<'a> { fn read_by_index( &self, - idx: u16, + index: u16, + done_rx: &Receiver<()>, ctx: &Context, - ) -> Result, Error> { + ) -> Result, Error> { let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register(on_read_by_index_resp(tx, idx)); + let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| { + if index != msg.index { + return; + } + let _ = tx.try_send(Entry::try_from(msg)); + }); self.sender.send(MsgSettingsReadByIndexReq { sender_id: Some(SENDER_ID), - index: idx, + index, })?; let res = crossbeam_channel::select! { - recv(rx) -> msg => Ok(Some(msg.unwrap())), - recv(self.done_rx) -> _ => Ok(None), + recv(rx) -> msg => msg.unwrap().map(Some), + recv(done_rx) -> _ => Ok(None), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), }; @@ -183,17 +187,31 @@ impl<'a> Client<'a> { group: String, name: String, ctx: Context, - ) -> Result, Error> { + ) -> Result, Error> { let (tx, rx) = crossbeam_channel::bounded(1); - let key = self - .link - .register(on_read_resp(tx, group.clone(), name.clone())); + let key = self.link.register({ + let group = group.clone(); + let name = name.clone(); + move |msg: MsgSettingsReadResp| { + let fields = split_multipart(&msg.setting); + if fields.len() < 2 || fields[0] != group || fields[1] != name { + return; + } + let _ = tx.send(Entry::try_from(msg).map(|e| { + if e.value.is_some() { + Some(e) + } else { + None + } + })); + } + }); self.sender.send(MsgSettingsReadReq { sender_id: Some(SENDER_ID), setting: format!("{}\0{}\0", group, name).into(), })?; let res = crossbeam_channel::select! { - recv(rx) -> msg => Ok(msg.unwrap()), + recv(rx) -> msg => msg.unwrap(), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), }; @@ -207,11 +225,24 @@ impl<'a> Client<'a> { name: String, value: String, ctx: Context, - ) -> Result<(), Error> { + ) -> Result { let (tx, rx) = crossbeam_channel::bounded(1); - let key = self - .link - .register(on_write_resp(tx, group.clone(), name.clone())); + let key = self.link.register({ + let group = group.clone(); + let name = name.clone(); + move |msg: MsgSettingsWriteResp| { + let fields = split_multipart(&msg.setting); + if fields.len() < 2 || fields[0] != group || fields[1] != name { + return; + } + if msg.status != 0 { + let e = Error::WriteError(WriteSettingError::from(msg.status)); + let _ = tx.send(Err(e)); + return; + } + let _ = tx.send(Entry::try_from(msg)); + } + }); self.sender.send(MsgSettingsWrite { sender_id: Some(SENDER_ID), setting: format!("{}\0{}\0{}\0", group, name, value).into(), @@ -226,91 +257,82 @@ impl<'a> Client<'a> { } } -fn on_read_by_index_done(tx: Sender<()>) -> impl FnMut(MsgSettingsReadByIndexDone) { - move |_: MsgSettingsReadByIndexDone| { - for _ in 0..NUM_WORKERS { - let _ = tx.try_send(()); - } - } +#[derive(Debug, Clone, PartialEq)] +pub struct Entry { + pub setting: Cow<'static, Setting>, + pub value: Option, } -fn on_write_resp( - tx: Sender>>, - group: String, - name: String, -) -> impl FnMut(MsgSettingsWriteResp) { - move |msg: MsgSettingsWriteResp| { - let setting = msg.setting.to_string(); - let mut fields = setting.split('\0'); - if fields.next() == Some(&group) && fields.next() == Some(&name) { - if msg.status == 0 { - let _ = tx.send(Ok(())); - } else { - let _ = tx.send(Err(Error::Err(WriteSettingError::from(msg.status)))); - } +impl TryFrom for Entry { + type Error = Error; + + fn try_from(msg: MsgSettingsWriteResp) -> Result { + let fields = split_multipart(&msg.setting); + if let [group, name, value] = fields.as_slice() { + let setting = Setting::new(group, name); + let value = SettingValue::parse(value, setting.kind); + Ok(Entry { setting, value }) + } else { + Err(Error::ParseError) } } } -fn on_read_resp( - tx: Sender>, - group: String, - name: String, -) -> impl FnMut(MsgSettingsReadResp) { - move |msg: MsgSettingsReadResp| { - let setting = msg.setting.to_string(); - let mut fields = setting.split('\0'); - if fields.next() == Some(&group) && fields.next() == Some(&name) { - let entry = Setting::find(&group, &name).map_or_else( - || Cow::Owned(Setting::unknown(group.clone(), name.clone())), - Cow::Borrowed, - ); - match fields.next() { - Some(v) => { - let value = SettingValue::parse(v, entry.kind); - let _ = tx.try_send(Some(ReadResp { entry, value })); - } - None => { - let _ = tx.try_send(None); - } +impl TryFrom for Entry { + type Error = Error; + + fn try_from(msg: MsgSettingsReadResp) -> Result { + let fields = split_multipart(&msg.setting); + match fields.as_slice() { + [group, name] => { + let setting = Setting::new(&group, &name); + Ok(Entry { + setting, + value: None, + }) + } + [group, name, value] | [group, name, value, ..] => { + let setting = Setting::new(&group, &name); + let value = SettingValue::parse(value, setting.kind); + Ok(Entry { setting, value }) } + _ => Err(Error::ParseError), } } } -fn on_read_by_index_resp(tx: Sender, idx: u16) -> impl FnMut(MsgSettingsReadByIndexResp) { - move |msg: MsgSettingsReadByIndexResp| { - if idx != msg.index { - return; - } - let setting = msg.setting.to_string(); - let mut fields = setting.split('\0'); - let group = fields.next().unwrap_or_default(); - let name = fields.next().unwrap_or_default(); - let mut entry = Setting::find(&group, &name).map_or_else( - || Cow::Owned(Setting::unknown(group.to_owned(), name.to_owned())), - Cow::Borrowed, - ); - let value = fields - .next() - .and_then(|s| SettingValue::parse(s, entry.kind)); - if entry.kind == SettingKind::Enum { - if let Some(fmt_type) = fields.next() { - let mut parts = fmt_type.splitn(2, ':'); - let possible_values = parts.nth(1); - if let Some(p) = possible_values { - entry.to_mut().enumerated_possible_values = Some(p.to_owned()); - } +impl TryFrom for Entry { + type Error = Error; + + fn try_from(msg: MsgSettingsReadByIndexResp) -> Result { + let fields = split_multipart(&msg.setting); + match fields.as_slice() { + [group, name, value, fmt_type] => { + let setting = if fmt_type.is_empty() { + Setting::new(group, name) + } else { + Setting::with_fmt_type(group, name, fmt_type) + }; + let value = if !value.is_empty() { + SettingValue::parse(value, setting.kind) + } else { + None + }; + Ok(Entry { setting, value }) } + _ => Err(Error::ParseError), } - let _ = tx.try_send(ReadResp { entry, value }); } } -#[derive(Debug, Clone, PartialEq)] -pub struct ReadResp { - pub entry: Cow<'static, Setting>, - pub value: Option, +fn split_multipart(s: &SbpString, Multipart>) -> Vec> { + let mut parts: Vec<_> = s + .as_bytes() + .split(|b| *b == 0) + .map(String::from_utf8_lossy) + .collect(); + parts.pop(); + parts } pub struct Context { @@ -421,34 +443,33 @@ impl MsgSender { } #[derive(Debug)] -pub enum Error { - Err(E), +pub enum Error { SenderError(BoxedError), + WriteError(WriteSettingError), + ParseError, TimedOut, Canceled, } -impl From for Error { +impl From for Error { fn from(v: BoxedError) -> Self { Self::SenderError(v) } } -impl std::fmt::Display for Error -where - E: std::error::Error, -{ +impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Error::Err(err) => write!(f, "{}", err), + Error::SenderError(err) => write!(f, "{}", err), + Error::WriteError(err) => write!(f, "{}", err), + Error::ParseError => write!(f, "failed to parse setting"), Error::TimedOut => write!(f, "timed out"), Error::Canceled => write!(f, "canceled"), - Error::SenderError(err) => write!(f, "{}", err), } } } -impl std::error::Error for Error where E: std::error::Error {} +impl std::error::Error for Error {} #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum WriteSettingError { @@ -506,17 +527,6 @@ impl std::fmt::Display for WriteSettingError { impl std::error::Error for WriteSettingError {} -#[derive(Debug)] -pub struct ReadSettingError {} - -impl std::fmt::Display for ReadSettingError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "settings read failed") - } -} - -impl std::error::Error for ReadSettingError {} - #[cfg(test)] mod tests { use std::io::{Read, Write}; diff --git a/src/lib.rs b/src/lib.rs index bed6f52..8293086 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ mod client; pub mod settings; -pub use client::{Client, Context, Error, ReadResp, ReadSettingError, WriteSettingError}; +pub use client::{Client, Context, Entry, Error, WriteSettingError}; pub use settings::{Setting, SettingKind, SettingValue}; diff --git a/src/settings.rs b/src/settings.rs index 02f5263..24db5d9 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,4 +1,4 @@ -use std::{fmt, fs, io, path::Path}; +use std::{borrow::Cow, fmt, fs, io, path::Path}; use log::warn; use once_cell::sync::OnceCell; @@ -79,16 +79,39 @@ impl Setting { .find(|s| s.group == group && s.name == name) } - pub(crate) fn unknown(group: String, name: String) -> Setting { - warn!( - "No settings documentation entry or name: {} in group: {}", - name, group - ); - Setting { - group, - name, - ..Default::default() + pub(crate) fn new(group: impl AsRef, name: impl AsRef) -> Cow<'static, Setting> { + Setting::find(&group, &name).map_or_else( + || { + let group = group.as_ref().to_owned(); + let name = name.as_ref().to_owned(); + warn!( + "No settings documentation entry or name: {} in group: {}", + name, group + ); + Cow::Owned(Setting { + group, + name, + ..Default::default() + }) + }, + Cow::Borrowed, + ) + } + + pub(crate) fn with_fmt_type( + group: impl AsRef, + name: impl AsRef, + fmt_type: impl AsRef, + ) -> Cow<'static, Setting> { + let mut setting = Setting::new(group, name); + if setting.kind == SettingKind::Enum { + let mut parts = fmt_type.as_ref().splitn(2, ':'); + let possible_values = parts.nth(1); + if let Some(p) = possible_values { + setting.to_mut().enumerated_possible_values = Some(p.to_owned()); + } } + setting } } From 4a31915d96b6a073ecca00d885896a82fcb6af30 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Tue, 23 Nov 2021 14:07:55 -0800 Subject: [PATCH 05/13] mutable --- src/client.rs | 65 ++++++++++++++++++++------------------------------- 1 file changed, 25 insertions(+), 40 deletions(-) diff --git a/src/client.rs b/src/client.rs index 791b2d8..6a976d7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -65,7 +65,7 @@ impl<'a> Client<'a> { } pub fn write_setting( - &self, + &mut self, group: impl Into, name: impl Into, value: impl Into, @@ -75,7 +75,7 @@ impl<'a> Client<'a> { } pub fn write_setting_ctx( - &self, + &mut self, group: impl Into, name: impl Into, value: impl Into, @@ -85,7 +85,7 @@ impl<'a> Client<'a> { } pub fn read_setting( - &self, + &mut self, group: impl Into, name: impl Into, ) -> Result, Error> { @@ -94,7 +94,7 @@ impl<'a> Client<'a> { } pub fn read_setting_ctx( - &self, + &mut self, group: impl Into, name: impl Into, ctx: Context, @@ -102,16 +102,16 @@ impl<'a> Client<'a> { self.read_setting_inner(group.into(), name.into(), ctx) } - pub fn read_all(&self) -> (Vec, Vec) { + pub fn read_all(&mut self) -> (Vec, Vec) { let (ctx, _ctx_handle) = Context::new(); self.read_all_ctx(ctx) } - pub fn read_all_ctx(&self, ctx: Context) -> (Vec, Vec) { + pub fn read_all_ctx(&mut self, ctx: Context) -> (Vec, Vec) { self.read_all_inner(ctx) } - fn read_all_inner(&self, ctx: Context) -> (Vec, Vec) { + fn read_all_inner(&mut self, ctx: Context) -> (Vec, Vec) { let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS); let done_key = self.link.register(move |_: MsgSettingsReadByIndexDone| { for _ in 0..NUM_WORKERS { @@ -122,6 +122,7 @@ impl<'a> Client<'a> { let idx = AtomicU16::new(0); thread::scope(|scope| { for _ in 0..NUM_WORKERS { + let this = &self; let idx = &idx; let settings = &settings; let errors = &errors; @@ -129,7 +130,7 @@ impl<'a> Client<'a> { let ctx = ctx.clone(); scope.spawn(move |_| loop { let idx = idx.fetch_add(1, Ordering::SeqCst); - match self.read_by_index(idx, done_rx, &ctx) { + match this.read_by_index(idx, done_rx, &ctx) { Ok(Some(setting)) => { settings.lock().push((idx, setting)); } @@ -183,7 +184,7 @@ impl<'a> Client<'a> { } fn read_setting_inner( - &self, + &mut self, group: String, name: String, ctx: Context, @@ -220,7 +221,7 @@ impl<'a> Client<'a> { } fn write_setting_inner( - &self, + &mut self, group: String, name: String, value: String, @@ -553,7 +554,7 @@ mod tests { }, ); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let response = client.read_setting(group, name).unwrap().unwrap(); assert!(matches!(response.value, Some(SettingValue::Integer(10)))); } @@ -563,26 +564,18 @@ mod tests { let (group, name) = ("sbp", "obs_msg_max_size"); let mock = Mock::new(); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let (ctx, _ctx_handle) = Context::with_timeout(Duration::from_millis(100)); let now = Instant::now(); - let mut response1 = Ok(None); - let mut response2 = Ok(None); + let mut response = Ok(None); scope(|scope| { - scope.spawn({ - let ctx = ctx.clone(); - |_| { - response1 = client.read_setting_ctx(group, name, ctx); - } - }); scope.spawn(|_| { - response2 = client.read_setting_ctx(group, name, ctx); + response = client.read_setting_ctx(group, name, ctx); }); }) .unwrap(); assert!(now.elapsed().as_millis() >= 100); - assert!(matches!(response1, Err(Error::TimedOut))); - assert!(matches!(response2, Err(Error::TimedOut))); + assert!(matches!(response, Err(Error::TimedOut))); } #[test] @@ -590,28 +583,20 @@ mod tests { let (group, name) = ("sbp", "obs_msg_max_size"); let mock = Mock::new(); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let (ctx, ctx_handle) = Context::new(); let now = Instant::now(); - let mut response1 = Ok(None); - let mut response2 = Ok(None); + let mut response = Ok(None); scope(|scope| { - scope.spawn({ - let ctx = ctx.clone(); - |_| { - response1 = client.read_setting_ctx(group, name, ctx); - } - }); scope.spawn(|_| { - response2 = client.read_setting_ctx(group, name, ctx); + response = client.read_setting_ctx(group, name, ctx); }); std::thread::sleep(Duration::from_millis(100)); ctx_handle.cancel(); }) .unwrap(); assert!(now.elapsed().as_millis() >= 100); - assert!(matches!(response1, Err(Error::Canceled))); - assert!(matches!(response2, Err(Error::Canceled))); + assert!(matches!(response, Err(Error::Canceled))); } #[test] @@ -629,7 +614,7 @@ mod tests { }, ); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let response = client.read_setting(group, name).unwrap().unwrap(); assert!(matches!(response.value, Some(SettingValue::Integer(10)))); } @@ -649,7 +634,7 @@ mod tests { }, ); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let response = client.read_setting(group, name).unwrap().unwrap(); assert!(matches!(response.value, Some(SettingValue::Boolean(true)))); } @@ -669,7 +654,7 @@ mod tests { }, ); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let response = client.read_setting(group, name).unwrap().unwrap(); assert_eq!(response.value, Some(SettingValue::Float(0.1))); } @@ -689,7 +674,7 @@ mod tests { }, ); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let response = client.read_setting(group, name).unwrap().unwrap(); assert_eq!(response.value, Some(SettingValue::String("foo".into()))); } @@ -709,7 +694,7 @@ mod tests { }, ); let (reader, writer) = mock.into_io(); - let client = Client::new(reader, writer); + let mut client = Client::new(reader, writer); let response = client.read_setting(group, name).unwrap().unwrap(); assert_eq!( response.value, From 3bd2ecb2accf6250d3aecb82a9090735ff4bbf8d Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Tue, 23 Nov 2021 14:20:32 -0800 Subject: [PATCH 06/13] cleanup --- src/client.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 6a976d7..214add8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -236,11 +236,6 @@ impl<'a> Client<'a> { if fields.len() < 2 || fields[0] != group || fields[1] != name { return; } - if msg.status != 0 { - let e = Error::WriteError(WriteSettingError::from(msg.status)); - let _ = tx.send(Err(e)); - return; - } let _ = tx.send(Entry::try_from(msg)); } }); @@ -268,6 +263,9 @@ impl TryFrom for Entry { type Error = Error; fn try_from(msg: MsgSettingsWriteResp) -> Result { + if msg.status != 0 { + return Err(Error::WriteError(msg.status.into())); + } let fields = split_multipart(&msg.setting); if let [group, name, value] = fields.as_slice() { let setting = Setting::new(group, name); From 3f56ec744ca3a5d31b9c683a2ae4a91292dfbc32 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Wed, 24 Nov 2021 13:08:59 -0800 Subject: [PATCH 07/13] replace unwraps --- src/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 214add8..e86d505 100644 --- a/src/client.rs +++ b/src/client.rs @@ -174,7 +174,7 @@ impl<'a> Client<'a> { index, })?; let res = crossbeam_channel::select! { - recv(rx) -> msg => msg.unwrap().map(Some), + recv(rx) -> msg => msg.expect("read_by_index channel disconnected").map(Some), recv(done_rx) -> _ => Ok(None), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), @@ -198,7 +198,7 @@ impl<'a> Client<'a> { if fields.len() < 2 || fields[0] != group || fields[1] != name { return; } - let _ = tx.send(Entry::try_from(msg).map(|e| { + let _ = tx.try_send(Entry::try_from(msg).map(|e| { if e.value.is_some() { Some(e) } else { @@ -212,7 +212,7 @@ impl<'a> Client<'a> { setting: format!("{}\0{}\0", group, name).into(), })?; let res = crossbeam_channel::select! { - recv(rx) -> msg => msg.unwrap(), + recv(rx) -> msg => msg.expect("read_setting_inner channel disconnected"), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), }; @@ -236,7 +236,7 @@ impl<'a> Client<'a> { if fields.len() < 2 || fields[0] != group || fields[1] != name { return; } - let _ = tx.send(Entry::try_from(msg)); + let _ = tx.try_send(Entry::try_from(msg)); } }); self.sender.send(MsgSettingsWrite { @@ -244,7 +244,7 @@ impl<'a> Client<'a> { setting: format!("{}\0{}\0{}\0", group, name, value).into(), })?; let res = crossbeam_channel::select! { - recv(rx) -> msg => msg.unwrap(), + recv(rx) -> msg => msg.expect("write_setting_inner channel disconnected"), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), recv(ctx.cancel_rx) -> _ => Err(Error::Canceled), }; From aef6c2c474fa935a3cb0fc5beccef48004a6aa1c Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Wed, 24 Nov 2021 13:23:57 -0800 Subject: [PATCH 08/13] cleanup handlers --- src/client.rs | 48 +++++++++++++++++++++--------------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/src/client.rs b/src/client.rs index e86d505..a673b8a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -164,10 +164,9 @@ impl<'a> Client<'a> { ) -> Result, Error> { let (tx, rx) = crossbeam_channel::bounded(1); let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| { - if index != msg.index { - return; + if index == msg.index { + let _ = tx.try_send(Entry::try_from(msg)); } - let _ = tx.try_send(Entry::try_from(msg)); }); self.sender.send(MsgSettingsReadByIndexReq { sender_id: Some(SENDER_ID), @@ -189,15 +188,13 @@ impl<'a> Client<'a> { name: String, ctx: Context, ) -> Result, Error> { + let req = MsgSettingsReadReq { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0", group, name).into(), + }; let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register({ - let group = group.clone(); - let name = name.clone(); - move |msg: MsgSettingsReadResp| { - let fields = split_multipart(&msg.setting); - if fields.len() < 2 || fields[0] != group || fields[1] != name { - return; - } + let key = self.link.register(move |msg: MsgSettingsReadResp| { + if request_matches(&group, &name, &msg.setting) { let _ = tx.try_send(Entry::try_from(msg).map(|e| { if e.value.is_some() { Some(e) @@ -207,10 +204,7 @@ impl<'a> Client<'a> { })); } }); - self.sender.send(MsgSettingsReadReq { - sender_id: Some(SENDER_ID), - setting: format!("{}\0{}\0", group, name).into(), - })?; + self.sender.send(req)?; let res = crossbeam_channel::select! { recv(rx) -> msg => msg.expect("read_setting_inner channel disconnected"), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), @@ -227,22 +221,17 @@ impl<'a> Client<'a> { value: String, ctx: Context, ) -> Result { + let req = MsgSettingsWrite { + sender_id: Some(SENDER_ID), + setting: format!("{}\0{}\0{}\0", group, name, value).into(), + }; let (tx, rx) = crossbeam_channel::bounded(1); - let key = self.link.register({ - let group = group.clone(); - let name = name.clone(); - move |msg: MsgSettingsWriteResp| { - let fields = split_multipart(&msg.setting); - if fields.len() < 2 || fields[0] != group || fields[1] != name { - return; - } + let key = self.link.register(move |msg: MsgSettingsWriteResp| { + if request_matches(&group, &name, &msg.setting) { let _ = tx.try_send(Entry::try_from(msg)); } }); - self.sender.send(MsgSettingsWrite { - sender_id: Some(SENDER_ID), - setting: format!("{}\0{}\0{}\0", group, name, value).into(), - })?; + self.sender.send(req)?; let res = crossbeam_channel::select! { recv(rx) -> msg => msg.expect("write_setting_inner channel disconnected"), recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut), @@ -253,6 +242,11 @@ impl<'a> Client<'a> { } } +fn request_matches(group: &str, name: &str, setting: &SbpString, Multipart>) -> bool { + let fields = split_multipart(setting); + matches!(fields.as_slice(), [g, n, ..] if g == group && n == name) +} + #[derive(Debug, Clone, PartialEq)] pub struct Entry { pub setting: Cow<'static, Setting>, From 88236709345ccc45697b95987156b5f3e19e6817 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Wed, 24 Nov 2021 13:43:33 -0800 Subject: [PATCH 09/13] add some logs --- src/client.rs | 4 ++++ src/settings.rs | 5 +---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/client.rs b/src/client.rs index a673b8a..7fc4d11 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,7 @@ use std::{ use crossbeam_channel::{Receiver, Sender}; use crossbeam_utils::thread; +use log::trace; use parking_lot::Mutex; use sbp::{ link::{Link, LinkSource}, @@ -162,6 +163,7 @@ impl<'a> Client<'a> { done_rx: &Receiver<()>, ctx: &Context, ) -> Result, Error> { + trace!("read_by_idx: {}", index); let (tx, rx) = crossbeam_channel::bounded(1); let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| { if index == msg.index { @@ -188,6 +190,7 @@ impl<'a> Client<'a> { name: String, ctx: Context, ) -> Result, Error> { + trace!("read_setting: {} {}", group, name); let req = MsgSettingsReadReq { sender_id: Some(SENDER_ID), setting: format!("{}\0{}\0", group, name).into(), @@ -221,6 +224,7 @@ impl<'a> Client<'a> { value: String, ctx: Context, ) -> Result { + trace!("write_setting: {} {} {}", group, name, value); let req = MsgSettingsWrite { sender_id: Some(SENDER_ID), setting: format!("{}\0{}\0{}\0", group, name, value).into(), diff --git a/src/settings.rs b/src/settings.rs index 24db5d9..3566041 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -84,10 +84,7 @@ impl Setting { || { let group = group.as_ref().to_owned(); let name = name.as_ref().to_owned(); - warn!( - "No settings documentation entry or name: {} in group: {}", - name, group - ); + warn!("No documentation entry setting {} -> {}", group, name); Cow::Owned(Setting { group, name, From d92dbf8473ded1d359490bdf758ef57d9978feb8 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Wed, 24 Nov 2021 14:05:17 -0800 Subject: [PATCH 10/13] reorganize --- src/client.rs | 201 +++----------------------------- src/context.rs | 85 ++++++++++++++ src/error.rs | 92 +++++++++++++++ src/lib.rs | 11 +- src/{settings.rs => setting.rs} | 0 tests/load.rs | 4 +- tests/load_from_path.rs | 4 +- 7 files changed, 206 insertions(+), 191 deletions(-) create mode 100644 src/context.rs create mode 100644 src/error.rs rename src/{settings.rs => setting.rs} (100%) diff --git a/src/client.rs b/src/client.rs index 7fc4d11..488dcdb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,14 +3,14 @@ use std::{ convert::TryFrom, io, sync::{ - atomic::{AtomicU16, AtomicUsize, Ordering}, + atomic::{AtomicU16, Ordering}, Arc, }, thread::JoinHandle, - time::{Duration, Instant}, + time::Duration, }; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Receiver; use crossbeam_utils::thread; use log::trace; use parking_lot::Mutex; @@ -24,7 +24,9 @@ use sbp::{ Sbp, SbpIterExt, SbpString, }; -use crate::{Setting, SettingValue}; +use crate::context::Context; +use crate::error::{BoxedError, Error}; +use crate::setting::{Setting, SettingValue}; const SENDER_ID: u16 = 0x42; const NUM_WORKERS: usize = 10; @@ -246,11 +248,6 @@ impl<'a> Client<'a> { } } -fn request_matches(group: &str, name: &str, setting: &SbpString, Multipart>) -> bool { - let fields = split_multipart(setting); - matches!(fields.as_slice(), [g, n, ..] if g == group && n == name) -} - #[derive(Debug, Clone, PartialEq)] pub struct Entry { pub setting: Cow<'static, Setting>, @@ -322,100 +319,6 @@ impl TryFrom for Entry { } } -fn split_multipart(s: &SbpString, Multipart>) -> Vec> { - let mut parts: Vec<_> = s - .as_bytes() - .split(|b| *b == 0) - .map(String::from_utf8_lossy) - .collect(); - parts.pop(); - parts -} - -pub struct Context { - cancel_rx: Receiver<()>, - timeout_rx: Receiver, - timeout_at: Instant, - shared: Arc, -} - -impl Context { - pub fn new() -> (Context, CtxHandle) { - let (cancel_tx, cancel_rx) = crossbeam_channel::unbounded(); - let timeout_rx = crossbeam_channel::never(); - let timeout_at = Instant::now() + Duration::from_secs(60 * 60 * 24); - let shared = Arc::new(CtxShared::new(cancel_tx)); - ( - Context { - cancel_rx, - timeout_rx, - timeout_at, - shared: Arc::clone(&shared), - }, - CtxHandle { shared }, - ) - } - - pub fn with_timeout(timeout: Duration) -> (Context, CtxHandle) { - let (mut ctx, h) = Self::new(); - ctx.set_timeout(timeout); - (ctx, h) - } - - pub fn set_timeout(&mut self, timeout: Duration) { - self.timeout_at = Instant::now() + timeout; - self.timeout_rx = crossbeam_channel::at(self.timeout_at); - } - - pub fn cancel(&self) { - self.shared.cancel(); - } -} - -impl Clone for Context { - fn clone(&self) -> Self { - self.shared.num_chans.fetch_add(1, Ordering::SeqCst); - Self { - cancel_rx: self.cancel_rx.clone(), - timeout_rx: crossbeam_channel::at(self.timeout_at), - timeout_at: self.timeout_at, - shared: Arc::clone(&self.shared), - } - } -} - -pub struct CtxHandle { - shared: Arc, -} - -impl CtxHandle { - pub fn cancel(&self) { - self.shared.cancel(); - } -} - -struct CtxShared { - cancel_tx: Sender<()>, - num_chans: AtomicUsize, -} - -impl CtxShared { - fn new(cancel_tx: Sender<()>) -> Self { - Self { - cancel_tx, - num_chans: AtomicUsize::new(1), - } - } - - fn cancel(&self) { - for _ in 0..self.num_chans.load(Ordering::SeqCst) { - let _ = self.cancel_tx.try_send(()); - } - } -} - -type BoxedError = Box; - type SenderFunc = Box Result<(), BoxedError> + Send>; struct MsgSender(Arc>); @@ -439,91 +342,21 @@ impl MsgSender { } } -#[derive(Debug)] -pub enum Error { - SenderError(BoxedError), - WriteError(WriteSettingError), - ParseError, - TimedOut, - Canceled, -} - -impl From for Error { - fn from(v: BoxedError) -> Self { - Self::SenderError(v) - } -} - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Error::SenderError(err) => write!(f, "{}", err), - Error::WriteError(err) => write!(f, "{}", err), - Error::ParseError => write!(f, "failed to parse setting"), - Error::TimedOut => write!(f, "timed out"), - Error::Canceled => write!(f, "canceled"), - } - } -} - -impl std::error::Error for Error {} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum WriteSettingError { - ValueRejected, - SettingRejected, - ParseFailed, - ReadOnly, - ModifyDisabled, - ServiceFailed, - Timeout, - Unknown, -} - -impl From for WriteSettingError { - fn from(n: u8) -> Self { - match n { - 1 => WriteSettingError::ValueRejected, - 2 => WriteSettingError::SettingRejected, - 3 => WriteSettingError::ParseFailed, - 4 => WriteSettingError::ReadOnly, - 5 => WriteSettingError::ModifyDisabled, - 6 => WriteSettingError::ServiceFailed, - 7 => WriteSettingError::Timeout, - _ => WriteSettingError::Unknown, - } - } +fn request_matches(group: &str, name: &str, setting: &SbpString, Multipart>) -> bool { + let fields = split_multipart(setting); + matches!(fields.as_slice(), [g, n, ..] if g == group && n == name) } -impl std::fmt::Display for WriteSettingError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - WriteSettingError::ValueRejected => { - write!(f, "setting value invalid",) - } - WriteSettingError::SettingRejected => { - write!(f, "setting does not exist") - } - WriteSettingError::ParseFailed => { - write!(f, "could not parse setting value ") - } - WriteSettingError::ReadOnly => { - write!(f, "setting is read only") - } - WriteSettingError::ModifyDisabled => { - write!(f, "setting is not modifiable") - } - WriteSettingError::ServiceFailed => write!(f, "system failure during setting"), - WriteSettingError::Timeout => write!(f, "request wasn't replied in time"), - WriteSettingError::Unknown => { - write!(f, "unknown settings write response") - } - } - } +fn split_multipart(s: &SbpString, Multipart>) -> Vec> { + let mut parts: Vec<_> = s + .as_bytes() + .split(|b| *b == 0) + .map(String::from_utf8_lossy) + .collect(); + parts.pop(); + parts } -impl std::error::Error for WriteSettingError {} - #[cfg(test)] mod tests { use std::io::{Read, Write}; diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..b95f91f --- /dev/null +++ b/src/context.rs @@ -0,0 +1,85 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{Receiver, Sender}; + +pub struct Context { + pub(crate) cancel_rx: Receiver<()>, + pub(crate) timeout_rx: Receiver, + timeout_at: Instant, + shared: Arc, +} + +impl Context { + pub fn new() -> (Context, CtxHandle) { + let (cancel_tx, cancel_rx) = crossbeam_channel::unbounded(); + let timeout_rx = crossbeam_channel::never(); + let timeout_at = Instant::now() + Duration::from_secs(u64::MAX); + let shared = Arc::new(Shared { + cancel_tx, + num_chans: AtomicUsize::new(1), + }); + ( + Context { + cancel_rx, + timeout_rx, + timeout_at, + shared: Arc::clone(&shared), + }, + CtxHandle { shared }, + ) + } + + pub fn with_timeout(timeout: Duration) -> (Context, CtxHandle) { + let (mut ctx, h) = Self::new(); + ctx.set_timeout(timeout); + (ctx, h) + } + + pub fn set_timeout(&mut self, timeout: Duration) { + self.timeout_at = Instant::now() + timeout; + self.timeout_rx = crossbeam_channel::at(self.timeout_at); + } + + pub fn cancel(&self) { + self.shared.cancel(); + } +} + +impl Clone for Context { + fn clone(&self) -> Self { + self.shared.num_chans.fetch_add(1, Ordering::SeqCst); + Self { + cancel_rx: self.cancel_rx.clone(), + timeout_rx: crossbeam_channel::at(self.timeout_at), + timeout_at: self.timeout_at, + shared: Arc::clone(&self.shared), + } + } +} + +pub struct CtxHandle { + shared: Arc, +} + +impl CtxHandle { + pub fn cancel(&self) { + self.shared.cancel(); + } +} + +struct Shared { + cancel_tx: Sender<()>, + num_chans: AtomicUsize, +} + +impl Shared { + fn cancel(&self) { + for _ in 0..self.num_chans.load(Ordering::SeqCst) { + let _ = self.cancel_tx.try_send(()); + } + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..9e190b7 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,92 @@ +pub type BoxedError = Box; + +#[derive(Debug)] +pub enum Error { + SenderError(BoxedError), + WriteError(WriteError), + ParseError, + TimedOut, + Canceled, +} + +impl From for Error { + fn from(e: BoxedError) -> Self { + Self::SenderError(e) + } +} + +impl From for Error { + fn from(e: WriteError) -> Self { + Self::WriteError(e) + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::SenderError(err) => write!(f, "{}", err), + Error::WriteError(err) => write!(f, "{}", err), + Error::ParseError => write!(f, "failed to parse setting"), + Error::TimedOut => write!(f, "timed out"), + Error::Canceled => write!(f, "canceled"), + } + } +} + +impl std::error::Error for Error {} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WriteError { + ValueRejected, + SettingRejected, + ParseFailed, + ReadOnly, + ModifyDisabled, + ServiceFailed, + Timeout, + Unknown, +} + +impl From for WriteError { + fn from(n: u8) -> Self { + match n { + 1 => WriteError::ValueRejected, + 2 => WriteError::SettingRejected, + 3 => WriteError::ParseFailed, + 4 => WriteError::ReadOnly, + 5 => WriteError::ModifyDisabled, + 6 => WriteError::ServiceFailed, + 7 => WriteError::Timeout, + _ => WriteError::Unknown, + } + } +} + +impl std::fmt::Display for WriteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WriteError::ValueRejected => { + write!(f, "setting value invalid",) + } + WriteError::SettingRejected => { + write!(f, "setting does not exist") + } + WriteError::ParseFailed => { + write!(f, "could not parse setting value ") + } + WriteError::ReadOnly => { + write!(f, "setting is read only") + } + WriteError::ModifyDisabled => { + write!(f, "setting is not modifiable") + } + WriteError::ServiceFailed => write!(f, "system failure during setting"), + WriteError::Timeout => write!(f, "request wasn't replied in time"), + WriteError::Unknown => { + write!(f, "unknown settings write response") + } + } + } +} + +impl std::error::Error for WriteError {} diff --git a/src/lib.rs b/src/lib.rs index 8293086..87df13a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,10 @@ mod client; -pub mod settings; +mod context; -pub use client::{Client, Context, Entry, Error, WriteSettingError}; -pub use settings::{Setting, SettingKind, SettingValue}; +pub mod error; +pub mod setting; + +pub use client::{Client, Entry}; +pub use context::Context; +pub use error::Error; +pub use setting::{Setting, SettingKind, SettingValue}; diff --git a/src/settings.rs b/src/setting.rs similarity index 100% rename from src/settings.rs rename to src/setting.rs diff --git a/tests/load.rs b/tests/load.rs index 5359e91..133ea32 100644 --- a/tests/load.rs +++ b/tests/load.rs @@ -1,8 +1,8 @@ -use sbp_settings::{settings, Setting, SettingKind}; +use sbp_settings::{setting, Setting, SettingKind}; #[test] fn test_load() { - settings::load(vec![Setting { + setting::load(vec![Setting { name: "aname".into(), group: "agroup".into(), kind: SettingKind::String, diff --git a/tests/load_from_path.rs b/tests/load_from_path.rs index a31a199..fa03318 100644 --- a/tests/load_from_path.rs +++ b/tests/load_from_path.rs @@ -1,8 +1,8 @@ -use sbp_settings::{settings, Setting}; +use sbp_settings::{setting, Setting}; #[test] fn test_load_from_path() { - settings::load_from_path("tests/data/settings.yaml").unwrap(); + setting::load_from_path("tests/data/settings.yaml").unwrap(); assert!(Setting::find("solution", "soln_freq").is_none()); assert!(Setting::find("surveyed_position", "broadcast").is_some()); } From 55b90ed12b1ea624b34193bf86aa207cae7cc656 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Mon, 29 Nov 2021 12:08:54 -0800 Subject: [PATCH 11/13] fix duration addition --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index b95f91f..88bec09 100644 --- a/src/context.rs +++ b/src/context.rs @@ -17,7 +17,7 @@ impl Context { pub fn new() -> (Context, CtxHandle) { let (cancel_tx, cancel_rx) = crossbeam_channel::unbounded(); let timeout_rx = crossbeam_channel::never(); - let timeout_at = Instant::now() + Duration::from_secs(u64::MAX); + let timeout_at = Instant::now() + Duration::new(31_557_600_000, 0); // a millenia; let shared = Arc::new(Shared { cancel_tx, num_chans: AtomicUsize::new(1), From 6c7ed8c308c10923bcf00ddf34b7cd911ddace13 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Mon, 29 Nov 2021 12:18:15 -0800 Subject: [PATCH 12/13] fix read resp parsing --- src/client.rs | 8 ++------ src/setting.rs | 3 +++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index 488dcdb..1b27662 100644 --- a/src/client.rs +++ b/src/client.rs @@ -285,7 +285,7 @@ impl TryFrom for Entry { value: None, }) } - [group, name, value] | [group, name, value, ..] => { + [group, name, value] => { let setting = Setting::new(&group, &name); let value = SettingValue::parse(value, setting.kind); Ok(Entry { setting, value }) @@ -307,11 +307,7 @@ impl TryFrom for Entry { } else { Setting::with_fmt_type(group, name, fmt_type) }; - let value = if !value.is_empty() { - SettingValue::parse(value, setting.kind) - } else { - None - }; + let value = SettingValue::parse(value, setting.kind); Ok(Entry { setting, value }) } _ => Err(Error::ParseError), diff --git a/src/setting.rs b/src/setting.rs index 3566041..bdf755c 100644 --- a/src/setting.rs +++ b/src/setting.rs @@ -166,6 +166,9 @@ pub enum SettingValue { impl SettingValue { pub fn parse(v: &str, kind: SettingKind) -> Option { + if v.is_empty() { + return None; + } match kind { SettingKind::Integer => v.parse().ok().map(SettingValue::Integer), SettingKind::Boolean if v == "True" => Some(SettingValue::Boolean(true)), From 15df718dc5a17a572740e0f3b4a186982bf7d72b Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Tue, 30 Nov 2021 11:34:15 -0800 Subject: [PATCH 13/13] fix test? --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index 88bec09..c67ddba 100644 --- a/src/context.rs +++ b/src/context.rs @@ -17,7 +17,7 @@ impl Context { pub fn new() -> (Context, CtxHandle) { let (cancel_tx, cancel_rx) = crossbeam_channel::unbounded(); let timeout_rx = crossbeam_channel::never(); - let timeout_at = Instant::now() + Duration::new(31_557_600_000, 0); // a millenia; + let timeout_at = Instant::now() + Duration::new(3_154_000_000, 0); // a century; let shared = Arc::new(Shared { cancel_tx, num_chans: AtomicUsize::new(1),