From fccc930f0da37e4e97badc703ae2cc368917623b Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Thu, 26 Aug 2021 16:29:23 -0700 Subject: [PATCH 1/6] use callbacks --- console_backend/src/baseline_tab.rs | 16 +- console_backend/src/broadcaster.rs | 134 +++++++++- console_backend/src/fileio.rs | 11 +- console_backend/src/lib.rs | 63 +++++ console_backend/src/main_tab.rs | 126 +++------- console_backend/src/process_messages.rs | 315 ++++++++++++------------ console_backend/src/types.rs | 202 +++++++++++++-- 7 files changed, 582 insertions(+), 285 deletions(-) diff --git a/console_backend/src/baseline_tab.rs b/console_backend/src/baseline_tab.rs index 17bfb722e..3e9b13f7f 100644 --- a/console_backend/src/baseline_tab.rs +++ b/console_backend/src/baseline_tab.rs @@ -1,12 +1,12 @@ -use capnp::message::Builder; +use std::collections::HashMap; +use capnp::message::Builder; use log::error; use sbp::messages::{ navigation::{MsgAgeCorrections, MsgUtcTime}, orientation::MsgBaselineHeading, piksi::MsgResetFilters, }; -use std::{collections::HashMap, io::Write}; use crate::constants::*; use crate::date_conv::*; @@ -54,7 +54,7 @@ pub(crate) struct BaselineTabButtons { /// - `utc_time`: The stored monotonic Utc time. /// - `baseline_log_file`: The CsvSerializer corresponding to an open velocity log if any. /// - `week`: The stored week value from GPS Time messages. -pub struct BaselineTab<'a, S: CapnProtoSender, W: Write> { +pub struct BaselineTab<'a, S: CapnProtoSender> { age_corrections: Option, client_sender: S, heading: Option, @@ -75,15 +75,11 @@ pub struct BaselineTab<'a, S: CapnProtoSender, W: Write> { utc_time: Option, pub baseline_log_file: Option, week: Option, - wtr: MsgSender, + wtr: MsgSender, } -impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> { - pub fn new( - shared_state: SharedState, - client_sender: S, - wtr: MsgSender, - ) -> BaselineTab<'a, S, W> { +impl<'a, S: CapnProtoSender> BaselineTab<'a, S> { + pub fn new(shared_state: SharedState, client_sender: S, wtr: MsgSender) -> BaselineTab<'a, S> { BaselineTab { age_corrections: None, client_sender, diff --git a/console_backend/src/broadcaster.rs b/console_backend/src/broadcaster.rs index c85dc8453..7aa5ff2aa 100644 --- a/console_backend/src/broadcaster.rs +++ b/console_backend/src/broadcaster.rs @@ -10,7 +10,7 @@ use sbp::{ messages::{ConcreteMessage, SBPMessage, SBP}, time::{GpsTime, GpsTimeError}, }; -use slotmap::HopSlotMap; +use slotmap::{DenseSlotMap, HopSlotMap}; type MaybeGpsTime = Option>; @@ -164,6 +164,117 @@ where } } +pub trait Handler { + fn run(&mut self, event: Event, time: MaybeGpsTime); +} + +pub struct WithTime; + +impl Handler for F +where + F: FnMut(E, MaybeGpsTime), + E: Event, +{ + fn run(&mut self, event: E, time: MaybeGpsTime) { + (self)(event, time) + } +} + +pub struct WithoutTime; + +impl Handler for F +where + F: FnMut(E), + E: Event, +{ + fn run(&mut self, event: E, _time: MaybeGpsTime) { + (self)(event) + } +} + +pub struct OnlyTime; + +impl Handler for F +where + F: FnMut(GpsTime), + E: Event, +{ + fn run(&mut self, _event: E, time: MaybeGpsTime) { + match time { + Some(Ok(time)) => (self)(time), + _ => {} + } + } +} + +struct Callback<'a> { + func: Box, + msg_types: &'static [u16], +} + +pub struct Link<'a> { + callbacks: Arc>>>, +} + +impl<'a> Link<'a> { + const CB_LOCK_FAILURE: &'static str = "failed to aquire lock on callbacks"; + + pub fn new() -> Self { + Self { + callbacks: Arc::new(Mutex::new(DenseSlotMap::with_key())), + } + } + + pub fn send(&self, message: &SBP, gps_time: MaybeGpsTime) -> bool { + let msg_type = message.get_message_type(); + let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE); + let to_call = cbs + .values_mut() + .filter(|cb| cb.msg_types.is_empty() || cb.msg_types.iter().any(|ty| ty == &msg_type)); + let mut called = false; + for cb in to_call { + (cb.func)(message.clone(), gps_time.clone()); + called = true; + } + called + } + + pub fn register_cb(&mut self, mut handler: H) -> Key + where + H: Handler + 'a, + E: Event, + { + let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE); + let inner = cbs.insert(Callback { + func: Box::new(move |msg, time| { + let event = E::from_sbp(msg); + handler.run(event, time) + }), + msg_types: E::MESSAGE_TYPES, + }); + Key { inner } + } + + pub fn unregister_cb(&self, key: Key) { + let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE); + cbs.remove(key.inner); + } +} + +impl Clone for Link<'_> { + fn clone(&self) -> Self { + Self { + callbacks: Arc::clone(&self.callbacks), + } + } +} + +impl Default for Link<'_> { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use crossbeam::scope; @@ -174,6 +285,27 @@ mod tests { use super::*; + struct Counter { + count: usize, + } + + impl Counter { + fn obs(&mut self, _: MsgObs) { + self.count += 1; + } + } + + #[test] + fn test_dispatcher() { + let mut d = Link::new(); + let mut c = Counter { count: 0 }; + d.register_cb(|obs| c.obs(obs)); + d.send(&make_msg_obs(), None); + d.send(&make_msg_obs_dep_a(), None); + drop(d); + assert_eq!(c.count, 1); + } + #[test] fn test_broadcaster() { let b = Broadcaster::new(); diff --git a/console_backend/src/fileio.rs b/console_backend/src/fileio.rs index dfd9f839f..8fd9c67c9 100644 --- a/console_backend/src/fileio.rs +++ b/console_backend/src/fileio.rs @@ -35,17 +35,14 @@ const OFFSET_LEN: usize = 4; const NULL_SEP_LEN: usize = 1; const WRITE_REQ_OVERHEAD_LEN: usize = SEQUENCE_LEN + OFFSET_LEN + NULL_SEP_LEN; -pub struct Fileio { +pub struct Fileio { broadcast: Broadcaster, - sender: MsgSender, + sender: MsgSender, config: Option, } -impl Fileio -where - W: Write + Send, -{ - pub fn new(broadcast: Broadcaster, sender: MsgSender) -> Self { +impl Fileio { + pub fn new(broadcast: Broadcaster, sender: MsgSender) -> Self { Self { broadcast, sender, diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index bcb3ea327..dca0e8e22 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -32,6 +32,69 @@ pub mod tracking_signals_tab; pub mod types; pub mod utils; +use std::cell::RefCell; + +use crate::{ + advanced_ins_tab::AdvancedInsTab, advanced_magnetometer_tab::AdvancedMagnetometerTab, + advanced_spectrum_analyzer_tab::AdvancedSpectrumAnalyzerTab, baseline_tab::BaselineTab, + main_tab::MainTab, observation_tab::ObservationTab, solution_tab::SolutionTab, + solution_velocity_tab::SolutionVelocityTab, status_bar::StatusBar, + tracking_signals_tab::TrackingSignalsTab, +}; + +struct Tabs<'a, S: types::CapnProtoSender> { + pub main_tab: RefCell>, + pub advanced_ins_tab: RefCell>, + pub advanced_magnetometer_tab: RefCell>, + pub baseline_tab: RefCell>, + pub tracking_signals_tab: RefCell>, + pub solution_tab: RefCell>, + pub observation_tab: RefCell>, + pub solution_velocity_tab: RefCell>, + pub advanced_spectrum_analyzer_tab: RefCell>, + pub status_bar: RefCell>, +} + +impl<'a, S: types::CapnProtoSender> Tabs<'a, S> { + fn new( + shared_state: types::SharedState, + client_sender: S, + msg_sender: types::MsgSender, + ) -> Self { + Self { + main_tab: MainTab::new(shared_state.clone(), client_sender.clone()).into(), + advanced_ins_tab: AdvancedInsTab::new(shared_state.clone(), client_sender.clone()) + .into(), + advanced_magnetometer_tab: AdvancedMagnetometerTab::new( + shared_state.clone(), + client_sender.clone(), + ) + .into(), + baseline_tab: BaselineTab::new(shared_state.clone(), client_sender.clone(), msg_sender) + .into(), + tracking_signals_tab: TrackingSignalsTab::new( + shared_state.clone(), + client_sender.clone(), + ) + .into(), + observation_tab: ObservationTab::new(shared_state.clone(), client_sender.clone()) + .into(), + solution_tab: SolutionTab::new(shared_state.clone(), client_sender.clone()).into(), + solution_velocity_tab: SolutionVelocityTab::new( + shared_state.clone(), + client_sender.clone(), + ) + .into(), + advanced_spectrum_analyzer_tab: AdvancedSpectrumAnalyzerTab::new( + shared_state.clone(), + client_sender.clone(), + ) + .into(), + status_bar: StatusBar::new(shared_state.clone(), client_sender.clone()).into(), + } + } +} + #[cfg(test)] pub mod test_common { use crate::constants::*; diff --git a/console_backend/src/main_tab.rs b/console_backend/src/main_tab.rs index e970f8177..c93d252b0 100644 --- a/console_backend/src/main_tab.rs +++ b/console_backend/src/main_tab.rs @@ -1,24 +1,16 @@ +use std::{path::PathBuf, result::Result, thread::sleep, time::Instant}; + use chrono::Local; use log::{debug, error}; use sbp::{messages::SBP, time::GpsTime}; -use std::{io::Write, path::PathBuf, result::Result, thread::sleep, time::Instant}; -use crate::advanced_ins_tab::AdvancedInsTab; -use crate::advanced_magnetometer_tab::AdvancedMagnetometerTab; -use crate::advanced_spectrum_analyzer_tab::AdvancedSpectrumAnalyzerTab; -use crate::baseline_tab::BaselineTab; use crate::common_constants::SbpLogging; use crate::constants::*; -use crate::observation_tab::ObservationTab; use crate::output::*; -use crate::solution_tab::SolutionTab; -use crate::solution_velocity_tab::SolutionVelocityTab; -use crate::status_bar::StatusBar; -use crate::tracking_signals_tab::TrackingSignalsTab; use crate::types::*; use crate::utils::refresh_loggingbar; -pub struct MainTab<'a, S: CapnProtoSender, W: Write> { +pub struct MainTab { logging_directory: PathBuf, last_csv_logging: CsvLogging, last_sbp_logging: SbpLogging, @@ -27,23 +19,10 @@ pub struct MainTab<'a, S: CapnProtoSender, W: Write> { last_gps_time: Option, client_sender: S, shared_state: SharedState, - pub advanced_ins_tab: AdvancedInsTab, - pub advanced_magnetometer_tab: AdvancedMagnetometerTab, - pub baseline_tab: BaselineTab<'a, S, W>, - pub tracking_signals_tab: TrackingSignalsTab, - pub solution_tab: SolutionTab, - pub observation_tab: ObservationTab, - pub solution_velocity_tab: SolutionVelocityTab<'a, S>, - pub advanced_spectrum_analyzer_tab: AdvancedSpectrumAnalyzerTab, - pub status_bar: StatusBar, } -impl<'a, S: CapnProtoSender, W: Write> MainTab<'a, S, W> { - pub fn new( - shared_state: SharedState, - client_sender: S, - msg_sender: MsgSender, - ) -> MainTab<'a, S, W> { +impl<'a, S: CapnProtoSender> MainTab { + pub fn new(shared_state: SharedState, client_sender: S) -> MainTab { MainTab { logging_directory: shared_state.logging_directory(), last_csv_logging: CsvLogging::OFF, @@ -53,27 +32,6 @@ impl<'a, S: CapnProtoSender, W: Write> MainTab<'a, S, W> { last_gps_update: Instant::now(), client_sender: client_sender.clone(), shared_state: shared_state.clone(), - advanced_ins_tab: AdvancedInsTab::new(shared_state.clone(), client_sender.clone()), - advanced_magnetometer_tab: AdvancedMagnetometerTab::new( - shared_state.clone(), - client_sender.clone(), - ), - baseline_tab: BaselineTab::new(shared_state.clone(), client_sender.clone(), msg_sender), - tracking_signals_tab: TrackingSignalsTab::new( - shared_state.clone(), - client_sender.clone(), - ), - observation_tab: ObservationTab::new(shared_state.clone(), client_sender.clone()), - solution_tab: SolutionTab::new(shared_state.clone(), client_sender.clone()), - solution_velocity_tab: SolutionVelocityTab::new( - shared_state.clone(), - client_sender.clone(), - ), - advanced_spectrum_analyzer_tab: AdvancedSpectrumAnalyzerTab::new( - shared_state.clone(), - client_sender.clone(), - ), - status_bar: StatusBar::new(shared_state, client_sender), } } @@ -115,49 +73,27 @@ impl<'a, S: CapnProtoSender, W: Write> MainTab<'a, S, W> { /// - `logging`: The type of sbp logging to use; otherwise, None. pub fn init_csv_logging(&mut self) { let local_t = Local::now(); + let vel_log_file = local_t.format(VEL_TIME_STR_FILEPATH).to_string(); let vel_log_file = self.logging_directory.join(vel_log_file); - self.solution_tab.vel_log_file = match CsvSerializer::new(&vel_log_file) { - Ok(vel_csv) => Some(vel_csv), - Err(e) => { - error!("issue creating file, {:?}, error, {}", vel_log_file, e); - None - } - }; + self.shared_state.start_vel_log(&vel_log_file); + let pos_log_file = local_t.format(POS_LLH_TIME_STR_FILEPATH).to_string(); let pos_log_file = self.logging_directory.join(pos_log_file); - self.solution_tab.pos_log_file = match CsvSerializer::new(&pos_log_file) { - Ok(pos_csv) => Some(pos_csv), - Err(e) => { - error!("issue creating file, {:?}, error, {}", pos_log_file, e); - None - } - }; + self.shared_state.start_pos_log(&pos_log_file); + let baseline_log_file = local_t.format(BASELINE_TIME_STR_FILEPATH).to_string(); let baseline_log_file = self.logging_directory.join(baseline_log_file); - self.baseline_tab.baseline_log_file = match CsvSerializer::new(&baseline_log_file) { - Ok(baseline_csv) => Some(baseline_csv), - Err(e) => { - error!("issue creating file, {:?}, error, {}", baseline_log_file, e); - None - } - }; + self.shared_state.start_baseline_log(&baseline_log_file); + self.shared_state.set_csv_logging(CsvLogging::ON); } + pub fn end_csv_logging(&mut self) -> crate::types::Result<()> { self.shared_state.set_csv_logging(CsvLogging::OFF); - if let Some(vel_log) = &mut self.solution_tab.vel_log_file { - vel_log.flush()?; - self.solution_tab.vel_log_file = None; - } - if let Some(pos_log) = &mut self.solution_tab.pos_log_file { - pos_log.flush()?; - self.solution_tab.pos_log_file = None; - } - if let Some(baseline_log) = &mut self.baseline_tab.baseline_log_file { - baseline_log.flush()?; - self.baseline_tab.baseline_log_file = None; - } + self.shared_state.end_vel_log()?; + self.shared_state.end_pos_log()?; + self.shared_state.end_baseline_log()?; Ok(()) } @@ -253,6 +189,8 @@ impl<'a, S: CapnProtoSender, W: Write> MainTab<'a, S, W> { #[cfg(test)] mod tests { use super::*; + use crate::baseline_tab::BaselineTab; + use crate::solution_tab::SolutionTab; use crate::types::{PosLLH, VelNED}; use crate::utils::{mm_to_m, ms_to_sec}; use glob::glob; @@ -287,9 +225,7 @@ mod tests { let shared_state = SharedState::new(); let client_send = TestSender { inner: Vec::new() }; let gps_s = GpsTimeTests::new(); - let wtr = sink(); - let msg_sender = MsgSender::new(wtr); - let mut main = MainTab::new(shared_state, client_send, msg_sender); + let mut main = MainTab::new(shared_state, client_send); let early_gps_time_good = GpsTime::new(gps_s.good_week, gps_s.early_gps_tow_good).unwrap(); let later_gps_time_good = GpsTime::new(gps_s.good_week, gps_s.later_gps_tow_good); main.last_gps_time = Some(early_gps_time_good); @@ -307,9 +243,7 @@ mod tests { let shared_state = SharedState::new(); let client_send = TestSender { inner: Vec::new() }; let gps_s = GpsTimeTests::new(); - let wtr = sink(); - let msg_sender = MsgSender::new(wtr); - let mut main = MainTab::new(shared_state, client_send, msg_sender); + let mut main = MainTab::new(shared_state, client_send); let later_gps_time_good = GpsTime::new(gps_s.good_week, gps_s.later_gps_tow_good); let now = Instant::now(); main.last_gps_update = Instant::now(); @@ -325,7 +259,9 @@ mod tests { let client_send = TestSender { inner: Vec::new() }; let wtr = sink(); let msg_sender = MsgSender::new(wtr); - let mut main = MainTab::new(shared_state, client_send, msg_sender); + let mut main = MainTab::new(shared_state.clone(), client_send.clone()); + let mut solution_tab = SolutionTab::new(shared_state.clone(), client_send.clone()); + let mut baseline_tab = BaselineTab::new(shared_state, client_send, msg_sender); assert_eq!(main.last_csv_logging, CsvLogging::OFF); main.shared_state.set_csv_logging(CsvLogging::ON); main.shared_state.set_logging_directory(tmp_dir.clone()); @@ -385,13 +321,11 @@ mod tests { { main.serialize_sbp(&SBP::MsgPosLLH(msg.clone())); - main.solution_tab.handle_pos_llh(PosLLH::MsgPosLLH(msg)); + solution_tab.handle_pos_llh(PosLLH::MsgPosLLH(msg)); main.serialize_sbp(&SBP::MsgVelNED(msg_two.clone())); - main.solution_tab - .handle_vel_ned(VelNED::MsgVelNED(msg_two.clone())); + solution_tab.handle_vel_ned(VelNED::MsgVelNED(msg_two.clone())); main.serialize_sbp(&SBP::MsgBaselineNED(msg_three.clone())); - main.baseline_tab - .handle_baseline_ned(BaselineNED::MsgBaselineNED(msg_three)); + baseline_tab.handle_baseline_ned(BaselineNED::MsgBaselineNED(msg_three)); assert_eq!(main.last_csv_logging, CsvLogging::ON); main.end_csv_logging().unwrap(); main.serialize_sbp(&SBP::MsgVelNED(msg_two)); @@ -457,9 +391,7 @@ mod tests { let tmp_dir = tmp_dir.path().to_path_buf(); let shared_state = SharedState::new(); let client_send = TestSender { inner: Vec::new() }; - let wtr = sink(); - let msg_sender = MsgSender::new(wtr); - let mut main = MainTab::new(shared_state, client_send, msg_sender); + let mut main = MainTab::new(shared_state, client_send); assert_eq!(main.last_sbp_logging, SbpLogging::OFF); main.shared_state.set_sbp_logging(SbpLogging::SBP); main.shared_state.set_logging_directory(tmp_dir.clone()); @@ -549,9 +481,7 @@ mod tests { let tmp_dir = tmp_dir.path().to_path_buf(); let shared_state = SharedState::new(); let client_send = TestSender { inner: Vec::new() }; - let wtr = sink(); - let msg_sender = MsgSender::new(wtr); - let mut main = MainTab::new(shared_state, client_send, msg_sender); + let mut main = MainTab::new(shared_state, client_send); assert_eq!(main.last_sbp_logging, SbpLogging::OFF); main.shared_state.set_sbp_logging(SbpLogging::SBP_JSON); main.shared_state.set_logging_directory(tmp_dir.clone()); diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index f4c30f4e7..02c08b3b7 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -1,17 +1,28 @@ +use std::{io::ErrorKind, thread::sleep, time::Duration}; + use log::{debug, error}; -use sbp::sbp_tools::{ControlFlow, SBPTools}; use sbp::{ - messages::{SBPMessage, SBP}, + messages::{ + imu::{MsgImuAux, MsgImuRaw}, + mag::MsgMagRaw, + navigation::{MsgAgeCorrections, MsgPosLLHCov, MsgUtcTime, MsgVelNED}, + observation::MsgObsDepA, + orientation::{MsgAngularRate, MsgBaselineHeading, MsgOrientEuler}, + system::{MsgHeartbeat, MsgInsStatus, MsgInsUpdates}, + tracking::{MsgMeasurementState, MsgTrackingState}, + SBPMessage, + }, + sbp_tools::{ControlFlow, SBPTools}, serialize::SbpSerialize, }; -use std::{io::ErrorKind, thread::sleep, time::Duration}; +use crate::broadcaster::Link; use crate::connection::Connection; use crate::constants::PAUSE_LOOP_SLEEP_DURATION_MS; use crate::log_panel::handle_log_msg; -use crate::main_tab::*; use crate::types::*; use crate::utils::{close_frontend, refresh_navbar}; +use crate::Tabs; pub fn process_messages( conn: Connection, @@ -27,7 +38,7 @@ where let msg_sender = MsgSender::new(wtr); shared_state.set_current_connection(conn.name()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); - let mut main = MainTab::new(shared_state.clone(), client_send.clone(), msg_sender); + let tabs = Tabs::new(shared_state.clone(), client_send.clone(), msg_sender); let messages = sbp::iter_messages(rdr) .handle_errors(|e| { debug!("{}", e); @@ -42,13 +53,143 @@ where } }) .with_rover_time(); - let mut attempt_delay; + + let mut link = Link::new(); + + link.register_cb(|msg: MsgAgeCorrections| { + tabs.baseline_tab + .borrow_mut() + .handle_age_corrections(msg.clone()); + tabs.solution_tab + .borrow_mut() + .handle_age_corrections(msg.clone()); + tabs.status_bar.borrow_mut().handle_age_corrections(msg); + }); + + link.register_cb(|msg: MsgAngularRate| { + tabs.solution_tab.borrow_mut().handle_angular_rate(msg); + }); + + link.register_cb(|msg: MsgBaselineHeading| { + tabs.baseline_tab.borrow_mut().handle_baseline_heading(msg); + }); + + link.register_cb(|msg: BaselineNED| { + tabs.baseline_tab + .borrow_mut() + .handle_baseline_ned(msg.clone()); + tabs.status_bar.borrow_mut().handle_baseline_ned(msg); + }); + + link.register_cb(|msg: Dops| { + tabs.solution_tab.borrow_mut().handle_dops(msg); + }); + + link.register_cb(|msg: GpsTime| { + tabs.baseline_tab.borrow_mut().handle_gps_time(msg.clone()); + tabs.solution_tab.borrow_mut().handle_gps_time(msg); + }); + + link.register_cb(|_: MsgHeartbeat| { + tabs.status_bar.borrow_mut().handle_heartbeat(); + }); + + link.register_cb(|msg: MsgImuAux| { + tabs.advanced_ins_tab.borrow_mut().handle_imu_aux(msg); + }); + + link.register_cb(|msg: MsgImuRaw| { + tabs.advanced_ins_tab.borrow_mut().handle_imu_raw(msg); + }); + + link.register_cb(|msg: MsgInsStatus| { + tabs.solution_tab + .borrow_mut() + .handle_ins_status(msg.clone()); + tabs.status_bar.borrow_mut().handle_ins_status(msg); + }); + + link.register_cb(|msg: MsgInsUpdates| { + tabs.advanced_ins_tab + .borrow_mut() + .fusion_engine_status_bar + .handle_ins_updates(msg.clone()); + tabs.solution_tab + .borrow_mut() + .handle_ins_updates(msg.clone()); + tabs.status_bar.borrow_mut().handle_ins_updates(msg); + }); + + link.register_cb(|msg: MsgMagRaw| { + tabs.advanced_magnetometer_tab + .borrow_mut() + .handle_mag_raw(msg); + }); + + link.register_cb(|msg: MsgMeasurementState| { + tabs.tracking_signals_tab + .borrow_mut() + .handle_msg_measurement_state(msg.states); + }); + + link.register_cb(|msg: ObservationMsg| { + tabs.tracking_signals_tab + .borrow_mut() + .handle_obs(msg.clone()); + tabs.observation_tab.borrow_mut().handle_obs(msg); + }); + + link.register_cb(|_: MsgObsDepA| { + println!("The message type, MsgObsDepA, is not handled in the Tracking->SignalsPlot or Observation tab."); + }); + + link.register_cb(|msg: MsgOrientEuler| { + tabs.solution_tab.borrow_mut().handle_orientation_euler(msg); + }); + + link.register_cb(|msg: PosLLH| { + tabs.solution_tab.borrow_mut().handle_pos_llh(msg.clone()); + tabs.status_bar.borrow_mut().handle_pos_llh(msg); + }); + + link.register_cb(|msg: MsgPosLLHCov| { + tabs.solution_tab.borrow_mut().handle_pos_llh_cov(msg); + }); + + link.register_cb(|msg: Specan| { + tabs.advanced_spectrum_analyzer_tab + .borrow_mut() + .handle_specan(msg); + }); + + link.register_cb(|msg: MsgTrackingState| { + tabs.tracking_signals_tab + .borrow_mut() + .handle_msg_tracking_state(msg.states); + }); + + link.register_cb(|msg: VelNED| { + tabs.solution_tab.borrow_mut().handle_vel_ned(msg); + }); + + link.register_cb(|msg: MsgVelNED| { + // why does this tab not take both VelNED messages? + tabs.solution_velocity_tab.borrow_mut().handle_vel_ned(msg); + }); + + link.register_cb(|msg: MsgUtcTime| { + tabs.baseline_tab.borrow_mut().handle_utc_time(msg.clone()); + tabs.solution_tab.borrow_mut().handle_utc_time(msg); + }); + + link.register_cb(handle_log_msg); + for (message, gps_time) in messages { if !shared_state.is_running() { - if let Err(e) = main.end_csv_logging() { + if let Err(e) = tabs.main_tab.borrow_mut().end_csv_logging() { error!("Issue closing csv file, {}", e); } - main.close_sbp(); + tabs.main_tab.borrow_mut().close_sbp(); break; } if shared_state.is_paused() { @@ -59,162 +200,26 @@ where sleep(Duration::from_millis(PAUSE_LOOP_SLEEP_DURATION_MS)); } } - main.serialize_sbp(&message); - let msg_name = message.get_message_name(); - main.status_bar.add_bytes(message.sbp_size()); - attempt_delay = true; - match message { - SBP::MsgAgeCorrections(msg) => { - main.baseline_tab.handle_age_corrections(msg.clone()); - main.solution_tab.handle_age_corrections(msg.clone()); - main.status_bar.handle_age_corrections(msg); - } - SBP::MsgAngularRate(msg) => { - main.solution_tab.handle_angular_rate(msg); - } - SBP::MsgBaselineHeading(msg) => { - main.baseline_tab.handle_baseline_heading(msg); - } - SBP::MsgBaselineNED(msg) => { - main.baseline_tab - .handle_baseline_ned(BaselineNED::MsgBaselineNED(msg.clone())); - main.status_bar - .handle_baseline_ned(BaselineNED::MsgBaselineNED(msg)); - } - SBP::MsgBaselineNEDDepA(msg) => { - main.baseline_tab - .handle_baseline_ned(BaselineNED::MsgBaselineNEDDepA(msg.clone())); - main.status_bar - .handle_baseline_ned(BaselineNED::MsgBaselineNEDDepA(msg)); - } - SBP::MsgDops(msg) => { - main.solution_tab.handle_dops(Dops::MsgDops(msg)); - } - SBP::MsgDopsDepA(msg) => { - main.solution_tab.handle_dops(Dops::MsgDopsDepA(msg)); - } - SBP::MsgGPSTime(msg) => { - main.baseline_tab - .handle_gps_time(GpsTime::MsgGpsTime(msg.clone())); - main.solution_tab.handle_gps_time(GpsTime::MsgGpsTime(msg)); - } - SBP::MsgGPSTimeDepA(msg) => { - main.baseline_tab - .handle_gps_time(GpsTime::MsgGpsTimeDepA(msg.clone())); - main.solution_tab - .handle_gps_time(GpsTime::MsgGpsTimeDepA(msg)); - } - SBP::MsgHeartbeat(_) => { - main.status_bar.handle_heartbeat(); - } - SBP::MsgImuAux(msg) => { - main.advanced_ins_tab.handle_imu_aux(msg); - } - SBP::MsgImuRaw(msg) => { - main.advanced_ins_tab.handle_imu_raw(msg); - } - SBP::MsgInsStatus(msg) => { - main.solution_tab.handle_ins_status(msg.clone()); - main.status_bar.handle_ins_status(msg); - } - SBP::MsgInsUpdates(msg) => { - main.advanced_ins_tab - .fusion_engine_status_bar - .handle_ins_updates(msg.clone()); - main.solution_tab.handle_ins_updates(msg.clone()); - main.status_bar.handle_ins_updates(msg); - } - SBP::MsgMagRaw(msg) => { - main.advanced_magnetometer_tab.handle_mag_raw(msg); - } - SBP::MsgMeasurementState(msg) => { - main.tracking_signals_tab - .handle_msg_measurement_state(msg.states); - } - SBP::MsgObs(msg) => { - main.tracking_signals_tab - .handle_obs(ObservationMsg::MsgObs(msg.clone())); - main.observation_tab.handle_obs(ObservationMsg::MsgObs(msg)); - } - SBP::MsgObsDepA(_msg) => { - //CPP-85 Unhandled for tracking signals plot tab. - println!("The message type, MsgObsDepA, is not handled in the Tracking->SignalsPlot or Observation tab."); - } - SBP::MsgObsDepB(msg) => { - main.tracking_signals_tab - .handle_obs(ObservationMsg::MsgObsDepB(msg.clone())); - - main.observation_tab - .handle_obs(ObservationMsg::MsgObsDepB(msg)); - } - SBP::MsgObsDepC(msg) => { - main.tracking_signals_tab - .handle_obs(ObservationMsg::MsgObsDepC(msg.clone())); - - main.observation_tab - .handle_obs(ObservationMsg::MsgObsDepC(msg)); - } - SBP::MsgOrientEuler(msg) => { - main.solution_tab.handle_orientation_euler(msg); - } - SBP::MsgOsr(msg) => { - main.observation_tab.handle_obs(ObservationMsg::MsgOsr(msg)); - } - SBP::MsgPosLLH(msg) => { - main.solution_tab - .handle_pos_llh(PosLLH::MsgPosLLH(msg.clone())); - main.status_bar.handle_pos_llh(PosLLH::MsgPosLLH(msg)); - } - SBP::MsgPosLLHDepA(msg) => { - main.solution_tab - .handle_pos_llh(PosLLH::MsgPosLLHDepA(msg.clone())); - main.status_bar.handle_pos_llh(PosLLH::MsgPosLLHDepA(msg)); - } - SBP::MsgPosLLHCov(msg) => { - main.solution_tab.handle_pos_llh_cov(msg); - } - SBP::MsgSpecan(msg) => { - main.advanced_spectrum_analyzer_tab - .handle_specan(Specan::MsgSpecan(msg)); - } - SBP::MsgSpecanDep(msg) => { - main.advanced_spectrum_analyzer_tab - .handle_specan(Specan::MsgSpecanDep(msg)); - } - SBP::MsgTrackingState(msg) => { - main.tracking_signals_tab - .handle_msg_tracking_state(msg.states); - } - SBP::MsgVelNED(msg) => { - main.solution_tab - .handle_vel_ned(VelNED::MsgVelNED(msg.clone())); - main.solution_velocity_tab.handle_vel_ned(msg); - } - SBP::MsgVelNEDDepA(msg) => { - main.solution_tab.handle_vel_ned(VelNED::MsgVelNEDDepA(msg)); - } - SBP::MsgUtcTime(msg) => { - main.baseline_tab.handle_utc_time(msg.clone()); - main.solution_tab.handle_utc_time(msg); - } - SBP::MsgLog(msg) => handle_log_msg(msg), - - _ => { - attempt_delay = false; - } - } + tabs.main_tab.borrow_mut().serialize_sbp(&message); + tabs.status_bar.borrow_mut().add_bytes(message.sbp_size()); + let sent = link.send(&message, gps_time.clone()); if let RealtimeDelay::On = realtime_delay { - if attempt_delay { - main.realtime_delay(gps_time); + if sent { + tabs.main_tab.borrow_mut().realtime_delay(gps_time); } else { - debug!("Message, {}, ignored for realtime delay.", msg_name); + debug!( + "Message, {}, ignored for realtime delay.", + message.get_message_name() + ); } } log::logger().flush(); } + if conn.close_when_done() { shared_state.set_running(false, client_send.clone()); close_frontend(&mut client_send); } + Ok(()) } diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 96c4b35ee..0de8d870b 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -1,8 +1,9 @@ +use crate::broadcaster::Event; use crate::common_constants::{self as cc, SbpLogging}; use crate::constants::*; use crate::errors::*; use crate::log_panel::LogLevel; -use crate::output::CsvLogging; +use crate::output::{CsvLogging, CsvSerializer}; use crate::piksi_tools_constants::*; use crate::utils::{mm_to_m, ms_to_sec, set_connected_frontend}; @@ -16,7 +17,6 @@ use ordered_float::OrderedFloat; use sbp::codec::dencode::{FramedWrite, IterSinkExt}; use sbp::codec::sbp::SbpEncoder; use sbp::messages::piksi::{MsgSpecan, MsgSpecanDep}; -use sbp::messages::SBPMessage; use sbp::messages::{ navigation::{ MsgBaselineNED, MsgBaselineNEDDepA, MsgDops, MsgDopsDepA, MsgGPSTime, MsgGPSTimeDepA, @@ -28,8 +28,11 @@ use sbp::messages::{ }, SBP, }; +use sbp::messages::{ConcreteMessage, SBPMessage}; use serde::{Deserialize, Serialize}; use serialport::FlowControl as SPFlowControl; +use std::io; +use std::path::Path; use std::{ cmp::{Eq, PartialEq}, collections::HashMap, @@ -53,18 +56,24 @@ pub type Result = std::result::Result; pub type UtcDateTime = DateTime; /// Sends SBP messages to the connected device -pub struct MsgSender { - inner: Arc>>, +pub struct MsgSender { + inner: Arc, SbpEncoder>>>, } -impl MsgSender { +impl MsgSender { /// 42 is the conventional sender ID intended for messages sent from the host to the device const SENDER_ID: u16 = 42; const LOCK_FAILURE: &'static str = "failed to aquire sender lock"; - pub fn new(wtr: W) -> Self { + pub fn new(wtr: W) -> Self + where + W: io::Write + Send + 'static, + { Self { - inner: Arc::new(Mutex::new(FramedWrite::new(wtr, SbpEncoder::new()))), + inner: Arc::new(Mutex::new(FramedWrite::new( + Box::new(wtr), + SbpEncoder::new(), + ))), } } @@ -76,7 +85,7 @@ impl MsgSender { } } -impl Clone for MsgSender { +impl Clone for MsgSender { fn clone(&self) -> Self { MsgSender { inner: Arc::clone(&self.inner), @@ -298,6 +307,60 @@ impl SharedState { let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).connection_history.record_address(host, port); } + pub fn start_vel_log(&self, path: &Path) { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).solution_tab.velocity_tab.log_file = match CsvSerializer::new(path) { + Ok(vel_csv) => Some(vel_csv), + Err(e) => { + error!("issue creating file, {:?}, error, {}", path, e); + None + } + } + } + pub fn end_vel_log(&self) -> Result<()> { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + if let Some(ref mut log) = (*shared_data).solution_tab.velocity_tab.log_file { + log.flush()?; + } + (*shared_data).solution_tab.velocity_tab.log_file = None; + Ok(()) + } + pub fn start_pos_log(&self, path: &Path) { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).solution_tab.position_tab.log_file = match CsvSerializer::new(path) { + Ok(vel_csv) => Some(vel_csv), + Err(e) => { + error!("issue creating file, {:?}, error, {}", path, e); + None + } + } + } + pub fn end_pos_log(&self) -> Result<()> { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + if let Some(ref mut log) = (*shared_data).solution_tab.position_tab.log_file { + log.flush()?; + } + (*shared_data).solution_tab.position_tab.log_file = None; + Ok(()) + } + pub fn start_baseline_log(&self, path: &Path) { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).baseline_tab.log_file = match CsvSerializer::new(path) { + Ok(vel_csv) => Some(vel_csv), + Err(e) => { + error!("issue creating file, {:?}, error, {}", path, e); + None + } + } + } + pub fn end_baseline_log(&self) -> Result<()> { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + if let Some(ref mut log) = (*shared_data).baseline_tab.log_file { + log.flush()?; + } + (*shared_data).baseline_tab.log_file = None; + Ok(()) + } } impl Deref for SharedState { @@ -439,6 +502,7 @@ pub struct BaselineTabState { pub(crate) clear: bool, pub(crate) pause: bool, pub(crate) reset: bool, + pub(crate) log_file: Option, } impl BaselineTabState { @@ -447,6 +511,7 @@ impl BaselineTabState { clear: false, pause: false, reset: false, + log_file: None, } } } @@ -474,6 +539,7 @@ pub struct SolutionPositionTabState { pub last_odo_update_time: Instant, pub pause: bool, pub unit: String, + pub log_file: Option, } impl SolutionPositionTabState { @@ -485,6 +551,7 @@ impl SolutionPositionTabState { last_odo_update_time: Instant::now(), pause: false, unit: String::from(DEGREES), + log_file: None, } } } @@ -492,12 +559,14 @@ impl SolutionPositionTabState { #[derive(Debug)] pub struct SolutionVelocityTabState { pub unit: String, + pub log_file: Option, } impl SolutionVelocityTabState { fn new() -> SolutionVelocityTabState { SolutionVelocityTabState { unit: String::from(MPS), + log_file: None, } } } @@ -1163,7 +1232,9 @@ pub struct ObservationMsgFields { pub states: Vec, pub sender_id: Option, } + // Enum wrapping around various Observation Message types. +#[derive(Debug, Clone)] pub enum ObservationMsg { MsgObs(MsgObs), // MsgObsDepA(MsgObsDepA), @@ -1171,6 +1242,7 @@ pub enum ObservationMsg { MsgObsDepC(MsgObsDepC), MsgOsr(MsgOsr), } + impl ObservationMsg { pub fn fields(&self) -> ObservationMsgFields { let (n_obs, tow, wn, ns_residual, states, sender_id) = match &self { @@ -1251,6 +1323,26 @@ impl ObservationMsg { } } } + +impl Event for ObservationMsg { + const MESSAGE_TYPES: &'static [u16] = &[ + MsgObs::MESSAGE_TYPE, + MsgObsDepB::MESSAGE_TYPE, + MsgObsDepC::MESSAGE_TYPE, + MsgOsr::MESSAGE_TYPE, + ]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgObs(m) => ObservationMsg::MsgObs(m), + SBP::MsgObsDepB(m) => ObservationMsg::MsgObsDepB(m), + SBP::MsgObsDepC(m) => ObservationMsg::MsgObsDepC(m), + SBP::MsgOsr(m) => ObservationMsg::MsgOsr(m), + _ => unreachable!(), + } + } +} + // Struct with shared fields for various Observation Contents types. pub struct ObservationFields { pub is_deprecated_msg_type: bool, @@ -1264,6 +1356,7 @@ pub struct ObservationFields { pub lock: u16, pub flags: u8, } + // Enum wrapping around various Observation Contents observation types. pub enum Observations { PackedObsContent(PackedObsContent), @@ -1272,6 +1365,7 @@ pub enum Observations { PackedObsContentDepC(PackedObsContentDepC), PackedOsrContent(PackedOsrContent), } + impl Observations { pub fn fields(&self) -> ObservationFields { // DEP_B and DEP_A obs had different pseudorange scaling @@ -1468,13 +1562,15 @@ pub struct PosLLHFields { pub height: f64, pub n_sats: u8, } + // Enum wrapping around various PosLLH Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(clippy::upper_case_acronyms)] pub enum PosLLH { MsgPosLLH(MsgPosLLH), MsgPosLLHDepA(MsgPosLLHDepA), } + impl PosLLH { pub fn fields(&self) -> PosLLHFields { match self { @@ -1526,6 +1622,19 @@ impl PosLLH { } } } + +impl Event for PosLLH { + const MESSAGE_TYPES: &'static [u16] = &[MsgPosLLH::MESSAGE_TYPE, MsgPosLLHDepA::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgPosLLH(m) => PosLLH::MsgPosLLH(m), + SBP::MsgPosLLHDepA(m) => PosLLH::MsgPosLLHDepA(m), + _ => unreachable!(), + } + } +} + // Struct with shared fields for various Dops Message types. pub struct DopsFields { pub pdop: u16, @@ -1536,7 +1645,7 @@ pub struct DopsFields { pub flags: u8, } // Enum wrapping around various Dops Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Dops { MsgDops(MsgDops), MsgDopsDepA(MsgDopsDepA), @@ -1563,6 +1672,18 @@ impl Dops { } } +impl Event for Dops { + const MESSAGE_TYPES: &'static [u16] = &[MsgDops::MESSAGE_TYPE, MsgDopsDepA::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgDops(m) => Dops::MsgDops(m), + SBP::MsgDopsDepA(m) => Dops::MsgDopsDepA(m), + _ => unreachable!(), + } + } +} + // Struct with shared fields for various GpsTime Message types. pub struct GpsTimeFields { pub wn: u16, @@ -1570,7 +1691,7 @@ pub struct GpsTimeFields { pub flags: u8, } // Enum wrapping around various GpsTime Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum GpsTime { MsgGpsTime(MsgGPSTime), MsgGpsTimeDepA(MsgGPSTimeDepA), @@ -1590,6 +1711,18 @@ impl GpsTime { } } +impl Event for GpsTime { + const MESSAGE_TYPES: &'static [u16] = &[MsgGPSTime::MESSAGE_TYPE, MsgGPSTimeDepA::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgGPSTime(m) => GpsTime::MsgGpsTime(m), + SBP::MsgGPSTimeDepA(m) => GpsTime::MsgGpsTimeDepA(m), + _ => unreachable!(), + } + } +} + // Struct with shared fields for various Specan Message types. pub struct SpecanFields { pub wn: u16, @@ -1602,8 +1735,9 @@ pub struct SpecanFields { pub amplitude_unit: f32, pub channel_tag: u16, } + // Enum wrapping around various Specan Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Specan { MsgSpecan(MsgSpecan), MsgSpecanDep(MsgSpecanDep), @@ -1659,6 +1793,18 @@ impl Specan { } } +impl Event for Specan { + const MESSAGE_TYPES: &'static [u16] = &[MsgSpecan::MESSAGE_TYPE, MsgSpecanDep::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgSpecan(m) => Specan::MsgSpecan(m), + SBP::MsgSpecanDep(m) => Specan::MsgSpecanDep(m), + _ => unreachable!(), + } + } +} + // Struct with shared fields for various VelNED Message types. #[allow(clippy::upper_case_acronyms)] pub struct VelNEDFields { @@ -1669,8 +1815,9 @@ pub struct VelNEDFields { pub d: i32, pub n_sats: u8, } + // Enum wrapping around various Vel NED Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(clippy::upper_case_acronyms)] pub enum VelNED { MsgVelNED(MsgVelNED), @@ -1694,6 +1841,18 @@ impl VelNED { } } +impl Event for VelNED { + const MESSAGE_TYPES: &'static [u16] = &[MsgVelNED::MESSAGE_TYPE, MsgVelNEDDepA::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgVelNED(m) => VelNED::MsgVelNED(m), + SBP::MsgVelNEDDepA(m) => VelNED::MsgVelNEDDepA(m), + _ => unreachable!(), + } + } +} + // Baseline Tab Types. // Struct with shared fields for various BaselineNED Message types. @@ -1709,7 +1868,7 @@ pub struct BaselineNEDFields { pub n_sats: u8, } // Enum wrapping around various Baseline NED Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(clippy::upper_case_acronyms)] pub enum BaselineNED { MsgBaselineNED(MsgBaselineNED), @@ -1759,6 +1918,21 @@ impl BaselineNED { } } +impl Event for BaselineNED { + const MESSAGE_TYPES: &'static [u16] = &[ + MsgBaselineNED::MESSAGE_TYPE, + MsgBaselineNEDDepA::MESSAGE_TYPE, + ]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgBaselineNED(m) => BaselineNED::MsgBaselineNED(m), + SBP::MsgBaselineNEDDepA(m) => BaselineNED::MsgBaselineNEDDepA(m), + _ => unreachable!(), + } + } +} + // Solution Velocity Tab Types. #[derive(Debug, Clone, PartialEq)] pub enum VelocityUnits { From f3f651d63cc73d8b1a78805cee69844c7c2c1c31 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Thu, 26 Aug 2021 16:32:26 -0700 Subject: [PATCH 2/6] cleanup --- console_backend/src/lib.rs | 43 +++++++-------- console_backend/src/process_messages.rs | 70 +++++++++++-------------- 2 files changed, 49 insertions(+), 64 deletions(-) diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index dca0e8e22..4da86696e 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -43,15 +43,15 @@ use crate::{ }; struct Tabs<'a, S: types::CapnProtoSender> { - pub main_tab: RefCell>, - pub advanced_ins_tab: RefCell>, - pub advanced_magnetometer_tab: RefCell>, - pub baseline_tab: RefCell>, - pub tracking_signals_tab: RefCell>, - pub solution_tab: RefCell>, - pub observation_tab: RefCell>, - pub solution_velocity_tab: RefCell>, - pub advanced_spectrum_analyzer_tab: RefCell>, + pub main: RefCell>, + pub advanced_ins: RefCell>, + pub advanced_magnetometer: RefCell>, + pub baseline: RefCell>, + pub tracking_signals: RefCell>, + pub solution: RefCell>, + pub observation: RefCell>, + pub solution_velocity: RefCell>, + pub advanced_spectrum_analyzer: RefCell>, pub status_bar: RefCell>, } @@ -62,35 +62,30 @@ impl<'a, S: types::CapnProtoSender> Tabs<'a, S> { msg_sender: types::MsgSender, ) -> Self { Self { - main_tab: MainTab::new(shared_state.clone(), client_sender.clone()).into(), - advanced_ins_tab: AdvancedInsTab::new(shared_state.clone(), client_sender.clone()) - .into(), - advanced_magnetometer_tab: AdvancedMagnetometerTab::new( + main: MainTab::new(shared_state.clone(), client_sender.clone()).into(), + advanced_ins: AdvancedInsTab::new(shared_state.clone(), client_sender.clone()).into(), + advanced_magnetometer: AdvancedMagnetometerTab::new( shared_state.clone(), client_sender.clone(), ) .into(), - baseline_tab: BaselineTab::new(shared_state.clone(), client_sender.clone(), msg_sender) + baseline: BaselineTab::new(shared_state.clone(), client_sender.clone(), msg_sender) .into(), - tracking_signals_tab: TrackingSignalsTab::new( - shared_state.clone(), - client_sender.clone(), - ) - .into(), - observation_tab: ObservationTab::new(shared_state.clone(), client_sender.clone()) + tracking_signals: TrackingSignalsTab::new(shared_state.clone(), client_sender.clone()) .into(), - solution_tab: SolutionTab::new(shared_state.clone(), client_sender.clone()).into(), - solution_velocity_tab: SolutionVelocityTab::new( + observation: ObservationTab::new(shared_state.clone(), client_sender.clone()).into(), + solution: SolutionTab::new(shared_state.clone(), client_sender.clone()).into(), + solution_velocity: SolutionVelocityTab::new( shared_state.clone(), client_sender.clone(), ) .into(), - advanced_spectrum_analyzer_tab: AdvancedSpectrumAnalyzerTab::new( + advanced_spectrum_analyzer: AdvancedSpectrumAnalyzerTab::new( shared_state.clone(), client_sender.clone(), ) .into(), - status_bar: StatusBar::new(shared_state.clone(), client_sender.clone()).into(), + status_bar: StatusBar::new(shared_state, client_sender).into(), } } } diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 02c08b3b7..6d9b9b4a9 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -57,37 +57,35 @@ where let mut link = Link::new(); link.register_cb(|msg: MsgAgeCorrections| { - tabs.baseline_tab + tabs.baseline .borrow_mut() .handle_age_corrections(msg.clone()); - tabs.solution_tab + tabs.solution .borrow_mut() .handle_age_corrections(msg.clone()); tabs.status_bar.borrow_mut().handle_age_corrections(msg); }); link.register_cb(|msg: MsgAngularRate| { - tabs.solution_tab.borrow_mut().handle_angular_rate(msg); + tabs.solution.borrow_mut().handle_angular_rate(msg); }); link.register_cb(|msg: MsgBaselineHeading| { - tabs.baseline_tab.borrow_mut().handle_baseline_heading(msg); + tabs.baseline.borrow_mut().handle_baseline_heading(msg); }); link.register_cb(|msg: BaselineNED| { - tabs.baseline_tab - .borrow_mut() - .handle_baseline_ned(msg.clone()); + tabs.baseline.borrow_mut().handle_baseline_ned(msg.clone()); tabs.status_bar.borrow_mut().handle_baseline_ned(msg); }); link.register_cb(|msg: Dops| { - tabs.solution_tab.borrow_mut().handle_dops(msg); + tabs.solution.borrow_mut().handle_dops(msg); }); link.register_cb(|msg: GpsTime| { - tabs.baseline_tab.borrow_mut().handle_gps_time(msg.clone()); - tabs.solution_tab.borrow_mut().handle_gps_time(msg); + tabs.baseline.borrow_mut().handle_gps_time(msg.clone()); + tabs.solution.borrow_mut().handle_gps_time(msg); }); link.register_cb(|_: MsgHeartbeat| { @@ -95,48 +93,40 @@ where }); link.register_cb(|msg: MsgImuAux| { - tabs.advanced_ins_tab.borrow_mut().handle_imu_aux(msg); + tabs.advanced_ins.borrow_mut().handle_imu_aux(msg); }); link.register_cb(|msg: MsgImuRaw| { - tabs.advanced_ins_tab.borrow_mut().handle_imu_raw(msg); + tabs.advanced_ins.borrow_mut().handle_imu_raw(msg); }); link.register_cb(|msg: MsgInsStatus| { - tabs.solution_tab - .borrow_mut() - .handle_ins_status(msg.clone()); + tabs.solution.borrow_mut().handle_ins_status(msg.clone()); tabs.status_bar.borrow_mut().handle_ins_status(msg); }); link.register_cb(|msg: MsgInsUpdates| { - tabs.advanced_ins_tab + tabs.advanced_ins .borrow_mut() .fusion_engine_status_bar .handle_ins_updates(msg.clone()); - tabs.solution_tab - .borrow_mut() - .handle_ins_updates(msg.clone()); + tabs.solution.borrow_mut().handle_ins_updates(msg.clone()); tabs.status_bar.borrow_mut().handle_ins_updates(msg); }); link.register_cb(|msg: MsgMagRaw| { - tabs.advanced_magnetometer_tab - .borrow_mut() - .handle_mag_raw(msg); + tabs.advanced_magnetometer.borrow_mut().handle_mag_raw(msg); }); link.register_cb(|msg: MsgMeasurementState| { - tabs.tracking_signals_tab + tabs.tracking_signals .borrow_mut() .handle_msg_measurement_state(msg.states); }); link.register_cb(|msg: ObservationMsg| { - tabs.tracking_signals_tab - .borrow_mut() - .handle_obs(msg.clone()); - tabs.observation_tab.borrow_mut().handle_obs(msg); + tabs.tracking_signals.borrow_mut().handle_obs(msg.clone()); + tabs.observation.borrow_mut().handle_obs(msg); }); link.register_cb(|_: MsgObsDepA| { @@ -144,52 +134,52 @@ where }); link.register_cb(|msg: MsgOrientEuler| { - tabs.solution_tab.borrow_mut().handle_orientation_euler(msg); + tabs.solution.borrow_mut().handle_orientation_euler(msg); }); link.register_cb(|msg: PosLLH| { - tabs.solution_tab.borrow_mut().handle_pos_llh(msg.clone()); + tabs.solution.borrow_mut().handle_pos_llh(msg.clone()); tabs.status_bar.borrow_mut().handle_pos_llh(msg); }); link.register_cb(|msg: MsgPosLLHCov| { - tabs.solution_tab.borrow_mut().handle_pos_llh_cov(msg); + tabs.solution.borrow_mut().handle_pos_llh_cov(msg); }); link.register_cb(|msg: Specan| { - tabs.advanced_spectrum_analyzer_tab + tabs.advanced_spectrum_analyzer .borrow_mut() .handle_specan(msg); }); link.register_cb(|msg: MsgTrackingState| { - tabs.tracking_signals_tab + tabs.tracking_signals .borrow_mut() .handle_msg_tracking_state(msg.states); }); link.register_cb(|msg: VelNED| { - tabs.solution_tab.borrow_mut().handle_vel_ned(msg); + tabs.solution.borrow_mut().handle_vel_ned(msg); }); link.register_cb(|msg: MsgVelNED| { // why does this tab not take both VelNED messages? - tabs.solution_velocity_tab.borrow_mut().handle_vel_ned(msg); + tabs.solution_velocity.borrow_mut().handle_vel_ned(msg); }); link.register_cb(|msg: MsgUtcTime| { - tabs.baseline_tab.borrow_mut().handle_utc_time(msg.clone()); - tabs.solution_tab.borrow_mut().handle_utc_time(msg); + tabs.baseline.borrow_mut().handle_utc_time(msg.clone()); + tabs.solution.borrow_mut().handle_utc_time(msg); }); link.register_cb(handle_log_msg); for (message, gps_time) in messages { if !shared_state.is_running() { - if let Err(e) = tabs.main_tab.borrow_mut().end_csv_logging() { + if let Err(e) = tabs.main.borrow_mut().end_csv_logging() { error!("Issue closing csv file, {}", e); } - tabs.main_tab.borrow_mut().close_sbp(); + tabs.main.borrow_mut().close_sbp(); break; } if shared_state.is_paused() { @@ -200,12 +190,12 @@ where sleep(Duration::from_millis(PAUSE_LOOP_SLEEP_DURATION_MS)); } } - tabs.main_tab.borrow_mut().serialize_sbp(&message); + tabs.main.borrow_mut().serialize_sbp(&message); tabs.status_bar.borrow_mut().add_bytes(message.sbp_size()); let sent = link.send(&message, gps_time.clone()); if let RealtimeDelay::On = realtime_delay { if sent { - tabs.main_tab.borrow_mut().realtime_delay(gps_time); + tabs.main.borrow_mut().realtime_delay(gps_time); } else { debug!( "Message, {}, ignored for realtime delay.", From 8a963ae55e013155aa6269618500915bd9074124 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Thu, 26 Aug 2021 16:46:18 -0700 Subject: [PATCH 3/6] fix logging --- console_backend/src/baseline_tab.rs | 40 +++++++-------- console_backend/src/solution_tab.rs | 80 +++++++++++++++-------------- 2 files changed, 61 insertions(+), 59 deletions(-) diff --git a/console_backend/src/baseline_tab.rs b/console_backend/src/baseline_tab.rs index 3e9b13f7f..82bfdc5f9 100644 --- a/console_backend/src/baseline_tab.rs +++ b/console_backend/src/baseline_tab.rs @@ -10,7 +10,7 @@ use sbp::messages::{ use crate::constants::*; use crate::date_conv::*; -use crate::output::{BaselineLog, CsvSerializer}; +use crate::output::BaselineLog; use crate::piksi_tools_constants::EMPTY_STR; use crate::types::{ BaselineNED, CapnProtoSender, Deque, GnssModes, GpsTime, MsgSender, Result, SharedState, @@ -52,7 +52,6 @@ pub(crate) struct BaselineTabButtons { /// - `table`: This stores all the key/value pairs to be displayed in the Baseline Table. /// - `utc_source`: The string equivalent for the source of the UTC updates. /// - `utc_time`: The stored monotonic Utc time. -/// - `baseline_log_file`: The CsvSerializer corresponding to an open velocity log if any. /// - `week`: The stored week value from GPS Time messages. pub struct BaselineTab<'a, S: CapnProtoSender> { age_corrections: Option, @@ -73,7 +72,6 @@ pub struct BaselineTab<'a, S: CapnProtoSender> { table: HashMap<&'a str, String>, utc_source: Option, utc_time: Option, - pub baseline_log_file: Option, week: Option, wtr: MsgSender, } @@ -118,7 +116,6 @@ impl<'a, S: CapnProtoSender> BaselineTab<'a, S> { }, utc_source: None, utc_time: None, - baseline_log_file: None, week: None, wtr, } @@ -290,22 +287,25 @@ impl<'a, S: CapnProtoSender> BaselineTab<'a, S> { } } - if let Some(baseline_file) = &mut self.baseline_log_file { - let pc_time = format!("{}:{:0>6.06}", tloc, secloc); - if let Err(err) = baseline_file.serialize(&BaselineLog { - pc_time, - gps_time, - tow_s: Some(tow), - north_m: Some(n), - east_m: Some(e), - down_m: Some(d), - h_accuracy_m: Some(h_accuracy), - v_accuracy_m: Some(v_accuracy), - distance_m: Some(dist), - flags: baseline_ned_fields.flags, - num_sats: baseline_ned_fields.n_sats, - }) { - eprintln!("Unable to to write to baseline log, error {}.", err); + { + let mut shared_data = self.shared_state.lock().unwrap(); + if let Some(ref mut baseline_file) = (*shared_data).baseline_tab.log_file { + let pc_time = format!("{}:{:0>6.06}", tloc, secloc); + if let Err(err) = baseline_file.serialize(&BaselineLog { + pc_time, + gps_time, + tow_s: Some(tow), + north_m: Some(n), + east_m: Some(e), + down_m: Some(d), + h_accuracy_m: Some(h_accuracy), + v_accuracy_m: Some(v_accuracy), + distance_m: Some(dist), + flags: baseline_ned_fields.flags, + num_sats: baseline_ned_fields.n_sats, + }) { + eprintln!("Unable to to write to baseline log, error {}.", err); + } } } diff --git a/console_backend/src/solution_tab.rs b/console_backend/src/solution_tab.rs index 76e500bad..653114d2e 100644 --- a/console_backend/src/solution_tab.rs +++ b/console_backend/src/solution_tab.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, time::Instant}; use crate::constants::*; use crate::date_conv::*; -use crate::output::{CsvSerializer, PosLLHLog, VelLog}; +use crate::output::{PosLLHLog, VelLog}; use crate::piksi_tools_constants::EMPTY_STR; use crate::types::{ CapnProtoSender, Deque, Dops, GnssModes, GpsTime, PosLLH, SharedState, UtcDateTime, VelNED, @@ -67,7 +67,6 @@ pub struct SolutionTab { pub modes: Deque, pub nsec: Option, pub pending_draw_modes: Vec, - pub pos_log_file: Option, pub shared_state: SharedState, pub sln_cur_data: Vec>, pub sln_data: Vec>, @@ -76,7 +75,6 @@ pub struct SolutionTab { pub unit: &'static str, pub utc_source: Option, pub utc_time: Option, - pub vel_log_file: Option, pub week: Option, } @@ -110,7 +108,6 @@ impl SolutionTab { ], nsec: Some(0), pending_draw_modes: vec![], - pos_log_file: None, shared_state, sln_cur_data: { vec![vec![]; NUM_GNSS_MODES as usize] }, sln_data: { vec![vec![]; NUM_GNSS_MODES as usize] }, @@ -134,7 +131,6 @@ impl SolutionTab { unit: DEGREES, utc_source: None, utc_time: None, - vel_log_file: None, week: None, } } @@ -277,26 +273,29 @@ impl SolutionTab { // TODO(johnmichael.burke@) https://swift-nav.atlassian.net/browse/CPP-95 // Validate logging. - if let Some(vel_file) = &mut self.vel_log_file { - let mut gps_time = None; - if let Some(tgps) = tgps_ { - if let Some(secgps) = secgps_ { - gps_time = Some(format!("{}:{:0>6.06}", tgps, secgps)); + { + let mut shared_data = self.shared_state.lock().unwrap(); + if let Some(ref mut vel_file) = (*shared_data).solution_tab.velocity_tab.log_file { + let mut gps_time = None; + if let Some(tgps) = tgps_ { + if let Some(secgps) = secgps_ { + gps_time = Some(format!("{}:{:0>6.06}", tgps, secgps)); + } + } + let pc_time = format!("{}:{:0>6.06}", tloc, secloc); + if let Err(err) = vel_file.serialize(&VelLog { + pc_time, + gps_time, + tow_s: Some(tow), + north_mps: Some(n), + east_mps: Some(e), + down_mps: Some(d), + speed_mps: Some(speed), + flags: vel_ned_fields.flags, + num_signals: vel_ned_fields.n_sats, + }) { + eprintln!("Unable to to write to vel log, error {}.", err); } - } - let pc_time = format!("{}:{:0>6.06}", tloc, secloc); - if let Err(err) = vel_file.serialize(&VelLog { - pc_time, - gps_time, - tow_s: Some(tow), - north_mps: Some(n), - east_mps: Some(e), - down_mps: Some(d), - speed_mps: Some(speed), - flags: vel_ned_fields.flags, - num_signals: vel_ned_fields.n_sats, - }) { - eprintln!("Unable to to write to vel log, error {}.", err); } } self.table @@ -451,21 +450,24 @@ impl SolutionTab { // TODO(johnmichael.burke@) https://swift-nav.atlassian.net/browse/CPP-95 // Validate logging. - if let Some(pos_file) = &mut self.pos_log_file { - let pc_time = format!("{}:{:>6.06}", tloc, secloc); - if let Err(err) = pos_file.serialize(&PosLLHLog { - pc_time, - gps_time, - tow_s: Some(tow), - latitude_d: Some(pos_llh_fields.lat), - longitude_d: Some(pos_llh_fields.lon), - altitude_m: Some(pos_llh_fields.height), - h_accuracy_m: Some(pos_llh_fields.h_accuracy), - v_accuracy_m: Some(pos_llh_fields.v_accuracy), - n_sats: pos_llh_fields.n_sats, - flags: pos_llh_fields.flags, - }) { - eprintln!("Unable to to write to pos llh log, error {}.", err); + { + let mut shared_data = self.shared_state.lock().unwrap(); + if let Some(ref mut pos_file) = (*shared_data).solution_tab.position_tab.log_file { + let pc_time = format!("{}:{:>6.06}", tloc, secloc); + if let Err(err) = pos_file.serialize(&PosLLHLog { + pc_time, + gps_time, + tow_s: Some(tow), + latitude_d: Some(pos_llh_fields.lat), + longitude_d: Some(pos_llh_fields.lon), + altitude_m: Some(pos_llh_fields.height), + h_accuracy_m: Some(pos_llh_fields.h_accuracy), + v_accuracy_m: Some(pos_llh_fields.v_accuracy), + n_sats: pos_llh_fields.n_sats, + flags: pos_llh_fields.flags, + }) { + eprintln!("Unable to to write to pos llh log, error {}.", err); + } } } From 43ba2d929fcfa1fbaa85f6af4e8f33c148d4d339 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Fri, 27 Aug 2021 10:53:29 -0700 Subject: [PATCH 4/6] mutex :( --- console_backend/src/bin/fft_monitor.rs | 6 +- console_backend/src/bin/fileio.rs | 16 +-- console_backend/src/broadcaster.rs | 4 +- console_backend/src/connection.rs | 16 +-- console_backend/src/fft_monitor.rs | 3 +- console_backend/src/fileio.rs | 142 +++++++++++++----------- console_backend/src/lib.rs | 22 ++-- console_backend/src/process_messages.rs | 95 ++++++++++------ console_backend/src/types.rs | 9 +- 9 files changed, 174 insertions(+), 139 deletions(-) diff --git a/console_backend/src/bin/fft_monitor.rs b/console_backend/src/bin/fft_monitor.rs index 1d04b7efc..166ee94df 100644 --- a/console_backend/src/bin/fft_monitor.rs +++ b/console_backend/src/bin/fft_monitor.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use clap::Clap; use crossbeam::{channel, scope}; use sbp::{messages::piksi::MsgSpecan, sbp_tools::SBPTools}; @@ -60,7 +61,10 @@ fn main() -> Result<()> { println!("Writing to file: {}", &filename); let channel = opts.channel; let num_ffts = opts.num_ffts; - let (rdr, _) = opts.into_conn().try_connect(/*shared_state=*/ None)?; + let (rdr, _) = opts + .into_conn() + .try_connect(/*shared_state=*/ None) + .context("while connecting")?; scope(|s| { s.spawn(|_| run(rdr)); diff --git a/console_backend/src/bin/fileio.rs b/console_backend/src/bin/fileio.rs index 32a299558..cda84cea3 100644 --- a/console_backend/src/bin/fileio.rs +++ b/console_backend/src/bin/fileio.rs @@ -8,7 +8,7 @@ use crossbeam::{channel, scope}; use sbp::sbp_tools::SBPTools; use console_backend::{ - broadcaster::Broadcaster, + broadcaster::Link, cli_options::Input, fileio::Fileio, types::{MsgSender, Result}, @@ -49,15 +49,15 @@ pub enum Opts { } fn main() -> Result<()> { - let bc = Broadcaster::new(); + let link = Link::new(); + let link_source = link.clone(); let (done_tx, done_rx) = channel::bounded(0); - let bc_source = bc.clone(); let run = move |rdr| { let messages = sbp::iter_messages(rdr).log_errors(log::Level::Debug); for msg in messages { - bc_source.send(&msg, None); + link_source.send(&msg, None); if done_rx.try_recv().is_ok() { break; } @@ -74,7 +74,7 @@ fn main() -> Result<()> { let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); - let mut fileio = Fileio::new(bc, sender); + let mut fileio = Fileio::new(link, sender); let data = fs::File::open(source)?; fileio.overwrite(dest, data)?; eprintln!("file written successfully."); @@ -92,7 +92,7 @@ fn main() -> Result<()> { let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); - let mut fileio = Fileio::new(bc, sender); + let mut fileio = Fileio::new(link, sender); let dest: Box = match dest { Some(path) => Box::new(fs::File::create(path)?), None => Box::new(io::stdout()), @@ -108,7 +108,7 @@ fn main() -> Result<()> { let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); - let mut fileio = Fileio::new(bc, sender); + let mut fileio = Fileio::new(link, sender); let files = fileio.readdir(path)?; eprintln!("{:#?}", files); done_tx.send(true).unwrap(); @@ -121,7 +121,7 @@ fn main() -> Result<()> { let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); - let fileio = Fileio::new(bc, sender); + let fileio = Fileio::new(link, sender); fileio.remove(path)?; eprintln!("file deleted."); done_tx.send(true).unwrap(); diff --git a/console_backend/src/broadcaster.rs b/console_backend/src/broadcaster.rs index 7aa5ff2aa..b0443cf46 100644 --- a/console_backend/src/broadcaster.rs +++ b/console_backend/src/broadcaster.rs @@ -208,7 +208,7 @@ where } struct Callback<'a> { - func: Box, + func: Box, msg_types: &'static [u16], } @@ -241,7 +241,7 @@ impl<'a> Link<'a> { pub fn register_cb(&mut self, mut handler: H) -> Key where - H: Handler + 'a, + H: Handler + Send + 'a, E: Event, { let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE); diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs index 7bd5e30b5..6483bd967 100644 --- a/console_backend/src/connection.rs +++ b/console_backend/src/connection.rs @@ -2,7 +2,7 @@ use crate::constants::*; use crate::errors::*; use crate::process_messages::process_messages; use crate::types::*; -use chrono::{DateTime, Utc}; +use anyhow::anyhow; use crossbeam::channel::{unbounded, Receiver, Sender}; use log::{error, info}; use std::{ @@ -16,10 +16,6 @@ use std::{ time::Duration, }; -pub type Error = std::boxed::Box; -pub type Result = std::result::Result; -pub type UtcDateTime = DateTime; - #[derive(Clone)] pub struct TcpConnection { name: String, @@ -33,13 +29,11 @@ impl TcpConnection { } fn socket_addrs(name: String) -> Result { let socket = &mut name.to_socket_addrs()?; - let socket = if let Some(socket_) = socket.next() { - socket_ + if let Some(s) = socket.next() { + Ok(s) } else { - let e: Box = String::from(TCP_CONNECTION_PARSING_FAILURE).into(); - return Err(e); - }; - Ok(socket) + Err(anyhow!("{}", TCP_CONNECTION_PARSING_FAILURE)) + } } fn name(&self) -> String { self.name.clone() diff --git a/console_backend/src/fft_monitor.rs b/console_backend/src/fft_monitor.rs index a339eefc8..f28c57bd4 100644 --- a/console_backend/src/fft_monitor.rs +++ b/console_backend/src/fft_monitor.rs @@ -1,5 +1,6 @@ use crate::constants::{AMPLITUDES, CHANNELS, FREQUENCIES, SIGNALS_TOTAL}; use crate::types::{Result, Specan}; +use anyhow::bail; use serde::Serialize; use std::{ collections::HashMap, @@ -111,7 +112,7 @@ impl FftMonitor { if let Some(freqs) = fft.get(FREQUENCIES) { if let Some(amps) = fft.get(AMPLITUDES) { if freqs.len() != amps.len() { - return Err(format!("Frequencies length does not match amplitudes length for {:?}", gps_time).into()); + bail!("Frequencies length does not match amplitudes length for {:?}", gps_time); } if let Some(chan_ffts) = self.ffts.get_mut(&channel) { chan_ffts.append(&mut vec![fft]); diff --git a/console_backend/src/fileio.rs b/console_backend/src/fileio.rs index 8fd9c67c9..7d54ee302 100644 --- a/console_backend/src/fileio.rs +++ b/console_backend/src/fileio.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::{anyhow, bail}; use crossbeam::{atomic::AtomicCell, channel, scope, select, utils::Backoff}; use rand::Rng; use sbp::messages::{ @@ -16,7 +17,7 @@ use sbp::messages::{ }; use crate::{ - broadcaster::Broadcaster, + broadcaster::Link, types::{MsgSender, Result}, }; @@ -35,16 +36,16 @@ const OFFSET_LEN: usize = 4; const NULL_SEP_LEN: usize = 1; const WRITE_REQ_OVERHEAD_LEN: usize = SEQUENCE_LEN + OFFSET_LEN + NULL_SEP_LEN; -pub struct Fileio { - broadcast: Broadcaster, +pub struct Fileio<'a> { + link: Link<'a>, sender: MsgSender, config: Option, } -impl Fileio { - pub fn new(broadcast: Broadcaster, sender: MsgSender) -> Self { +impl<'a> Fileio<'a> { + pub fn new(link: Link<'a>, sender: MsgSender) -> Self { Self { - broadcast, + link, sender, config: None, } @@ -95,14 +96,11 @@ impl Fileio { open_requests.fetch_add(1); } - sbp::Result::Ok(()) + Result::Ok(()) }); - let (sub, key) = self.broadcast.subscribe::(); - s.spawn(move |_| { - for res in sub.iter() { - res_tx.send(res).unwrap(); - } + let key = self.link.register_cb(move |msg: MsgFileioReadResp| { + res_tx.send(msg).unwrap(); }); loop { @@ -112,7 +110,7 @@ impl Fileio { pending.insert(sequence, request); }, recv(res_rx) -> msg => { - let (msg, _) = msg?; + let msg = msg?; let req = match pending.remove(&msg.sequence) { Some(req) => req, None => continue, @@ -148,7 +146,7 @@ impl Fileio { } } - self.broadcast.unsubscribe(key); + self.link.unregister_cb(key); Ok(()) }) @@ -217,14 +215,11 @@ impl Fileio { open_requests.fetch_add(1); } - sbp::Result::Ok(()) + Result::Ok(()) }); - let (sub, key) = self.broadcast.subscribe::(); - s.spawn(move |_| { - for res in sub.iter() { - res_tx.send(res).unwrap(); - } + let key = self.link.register_cb(move |msg: MsgFileioWriteResp| { + res_tx.send(msg).unwrap(); }); let mut pending: HashMap = HashMap::new(); @@ -240,7 +235,7 @@ impl Fileio { pending.insert(req_state.sequence, (req_state, req)); }, recv(res_rx) -> msg => { - let (msg, _) = msg?; + let msg = msg?; if pending.remove(&msg.sequence).is_none() { continue } @@ -260,7 +255,7 @@ impl Fileio { } } - self.broadcast.unsubscribe(key); + self.link.unregister_cb(key); Result::Ok(()) }) @@ -273,38 +268,54 @@ impl Fileio { let mut seq = new_sequence(); let mut files = vec![]; - loop { - self.sender.send(SBP::from(MsgFileioReadDirReq { - sender_id: None, - sequence: seq, - offset: files.len() as u32, - dirname: path.clone().into(), - }))?; - - let (reply, _) = self - .broadcast - .wait::(READDIR_TIMEOUT)?; - - if reply.sequence != seq { - return Err(format!( - "MsgFileioReadDirResp didn't match request ({} vs {})", - reply.sequence, seq - ) - .into()); - } + let (tx, rx) = channel::unbounded(); - let mut contents = reply.contents; + let key = self.link.register_cb(move |msg: MsgFileioReadDirResp| { + tx.send(msg).unwrap(); + }); - if contents.is_empty() { - return Ok(files); - } - if contents[contents.len() - 1] == b'\0' { - contents.remove(contents.len() - 1); - } - for f in contents.split(|b| b == &b'\0') { - files.push(String::from_utf8_lossy(f).into_owned()); + self.sender.send(SBP::from(MsgFileioReadDirReq { + sender_id: None, + sequence: seq, + offset: files.len() as u32, + dirname: path.clone().into(), + }))?; + + loop { + select! { + recv(rx) -> msg => { + let msg = msg?; + if msg.sequence != seq { + self.link.unregister_cb(key); + bail!( + "MsgFileioReadDirResp didn't match request ({} vs {})", + msg.sequence, seq + ); + } + let mut contents = msg.contents; + if contents.is_empty() { + self.link.unregister_cb(key); + return Ok(files); + } + if contents[contents.len() - 1] == b'\0' { + contents.remove(contents.len() - 1); + } + for f in contents.split(|b| b == &b'\0') { + files.push(String::from_utf8_lossy(f).into_owned()); + } + seq += 1; + self.sender.send(SBP::from(MsgFileioReadDirReq { + sender_id: None, + sequence: seq, + offset: files.len() as u32, + dirname: path.clone().into(), + }))?; + }, + recv(channel::tick(READDIR_TIMEOUT)) -> _ => { + self.link.unregister_cb(key); + bail!("MsgFileioReadDirReq timed out"); + } } - seq += 1; } } @@ -322,12 +333,19 @@ impl Fileio { } let sequence = new_sequence(); - let (tx, rx) = channel::bounded(0); + let (stop_tx, stop_rx) = channel::bounded(0); + let (tx, rx) = channel::bounded(1); + let key = self.link.register_cb(move |msg: MsgFileioConfigResp| { + tx.send(FileioConfig::new(msg)).unwrap(); + stop_tx.send(true).unwrap(); + }); + + let sender = &self.sender; let config = scope(|s| { s.spawn(|_| { - while rx.try_recv().is_err() { - let _ = self.sender.send(SBP::from(MsgFileioConfigReq { + while stop_rx.try_recv().is_err() { + let _ = sender.send(SBP::from(MsgFileioConfigReq { sender_id: None, sequence, })); @@ -335,17 +353,15 @@ impl Fileio { } }); - let config = self - .broadcast - .wait::(CONFIG_REQ_TIMEOUT) - .map_or_else(|_| Default::default(), |(msg, _)| FileioConfig::new(msg)); - - tx.send(true).unwrap(); - - config + match rx.recv_timeout(CONFIG_REQ_TIMEOUT) { + Ok(config) => config, + Err(_) => Default::default(), + } }) .unwrap(); + self.link.unregister_cb(key); + self.config = Some(config); self.config.clone().unwrap() } @@ -414,7 +430,7 @@ impl FileioRequest { self.sent_at = Instant::now(); if self.retries >= MAX_RETRIES { - Err("fileio send message timeout".into()) + Err(anyhow!("fileio send message timeout")) } else { Ok(()) } diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index 4da86696e..36fa566be 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -32,7 +32,7 @@ pub mod tracking_signals_tab; pub mod types; pub mod utils; -use std::cell::RefCell; +use std::sync::Mutex; use crate::{ advanced_ins_tab::AdvancedInsTab, advanced_magnetometer_tab::AdvancedMagnetometerTab, @@ -43,16 +43,16 @@ use crate::{ }; struct Tabs<'a, S: types::CapnProtoSender> { - pub main: RefCell>, - pub advanced_ins: RefCell>, - pub advanced_magnetometer: RefCell>, - pub baseline: RefCell>, - pub tracking_signals: RefCell>, - pub solution: RefCell>, - pub observation: RefCell>, - pub solution_velocity: RefCell>, - pub advanced_spectrum_analyzer: RefCell>, - pub status_bar: RefCell>, + pub main: Mutex>, + pub advanced_ins: Mutex>, + pub advanced_magnetometer: Mutex>, + pub baseline: Mutex>, + pub tracking_signals: Mutex>, + pub solution: Mutex>, + pub observation: Mutex>, + pub solution_velocity: Mutex>, + pub advanced_spectrum_analyzer: Mutex>, + pub status_bar: Mutex>, } impl<'a, S: types::CapnProtoSender> Tabs<'a, S> { diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 6d9b9b4a9..e61c56f55 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -58,75 +58,91 @@ where link.register_cb(|msg: MsgAgeCorrections| { tabs.baseline - .borrow_mut() + .lock() + .unwrap() .handle_age_corrections(msg.clone()); tabs.solution - .borrow_mut() + .lock() + .unwrap() .handle_age_corrections(msg.clone()); - tabs.status_bar.borrow_mut().handle_age_corrections(msg); + tabs.status_bar.lock().unwrap().handle_age_corrections(msg); }); link.register_cb(|msg: MsgAngularRate| { - tabs.solution.borrow_mut().handle_angular_rate(msg); + tabs.solution.lock().unwrap().handle_angular_rate(msg); }); link.register_cb(|msg: MsgBaselineHeading| { - tabs.baseline.borrow_mut().handle_baseline_heading(msg); + tabs.baseline.lock().unwrap().handle_baseline_heading(msg); }); link.register_cb(|msg: BaselineNED| { - tabs.baseline.borrow_mut().handle_baseline_ned(msg.clone()); - tabs.status_bar.borrow_mut().handle_baseline_ned(msg); + tabs.baseline + .lock() + .unwrap() + .handle_baseline_ned(msg.clone()); + tabs.status_bar.lock().unwrap().handle_baseline_ned(msg); }); link.register_cb(|msg: Dops| { - tabs.solution.borrow_mut().handle_dops(msg); + tabs.solution.lock().unwrap().handle_dops(msg); }); link.register_cb(|msg: GpsTime| { - tabs.baseline.borrow_mut().handle_gps_time(msg.clone()); - tabs.solution.borrow_mut().handle_gps_time(msg); + tabs.baseline.lock().unwrap().handle_gps_time(msg.clone()); + tabs.solution.lock().unwrap().handle_gps_time(msg); }); link.register_cb(|_: MsgHeartbeat| { - tabs.status_bar.borrow_mut().handle_heartbeat(); + tabs.status_bar.lock().unwrap().handle_heartbeat(); }); link.register_cb(|msg: MsgImuAux| { - tabs.advanced_ins.borrow_mut().handle_imu_aux(msg); + tabs.advanced_ins.lock().unwrap().handle_imu_aux(msg); }); link.register_cb(|msg: MsgImuRaw| { - tabs.advanced_ins.borrow_mut().handle_imu_raw(msg); + tabs.advanced_ins.lock().unwrap().handle_imu_raw(msg); }); link.register_cb(|msg: MsgInsStatus| { - tabs.solution.borrow_mut().handle_ins_status(msg.clone()); - tabs.status_bar.borrow_mut().handle_ins_status(msg); + tabs.solution.lock().unwrap().handle_ins_status(msg.clone()); + tabs.status_bar.lock().unwrap().handle_ins_status(msg); }); link.register_cb(|msg: MsgInsUpdates| { tabs.advanced_ins - .borrow_mut() + .lock() + .unwrap() .fusion_engine_status_bar .handle_ins_updates(msg.clone()); - tabs.solution.borrow_mut().handle_ins_updates(msg.clone()); - tabs.status_bar.borrow_mut().handle_ins_updates(msg); + tabs.solution + .lock() + .unwrap() + .handle_ins_updates(msg.clone()); + tabs.status_bar.lock().unwrap().handle_ins_updates(msg); }); link.register_cb(|msg: MsgMagRaw| { - tabs.advanced_magnetometer.borrow_mut().handle_mag_raw(msg); + tabs.advanced_magnetometer + .lock() + .unwrap() + .handle_mag_raw(msg); }); link.register_cb(|msg: MsgMeasurementState| { tabs.tracking_signals - .borrow_mut() + .lock() + .unwrap() .handle_msg_measurement_state(msg.states); }); link.register_cb(|msg: ObservationMsg| { - tabs.tracking_signals.borrow_mut().handle_obs(msg.clone()); - tabs.observation.borrow_mut().handle_obs(msg); + tabs.tracking_signals + .lock() + .unwrap() + .handle_obs(msg.clone()); + tabs.observation.lock().unwrap().handle_obs(msg); }); link.register_cb(|_: MsgObsDepA| { @@ -134,52 +150,54 @@ where }); link.register_cb(|msg: MsgOrientEuler| { - tabs.solution.borrow_mut().handle_orientation_euler(msg); + tabs.solution.lock().unwrap().handle_orientation_euler(msg); }); link.register_cb(|msg: PosLLH| { - tabs.solution.borrow_mut().handle_pos_llh(msg.clone()); - tabs.status_bar.borrow_mut().handle_pos_llh(msg); + tabs.solution.lock().unwrap().handle_pos_llh(msg.clone()); + tabs.status_bar.lock().unwrap().handle_pos_llh(msg); }); link.register_cb(|msg: MsgPosLLHCov| { - tabs.solution.borrow_mut().handle_pos_llh_cov(msg); + tabs.solution.lock().unwrap().handle_pos_llh_cov(msg); }); link.register_cb(|msg: Specan| { tabs.advanced_spectrum_analyzer - .borrow_mut() + .lock() + .unwrap() .handle_specan(msg); }); link.register_cb(|msg: MsgTrackingState| { tabs.tracking_signals - .borrow_mut() + .lock() + .unwrap() .handle_msg_tracking_state(msg.states); }); link.register_cb(|msg: VelNED| { - tabs.solution.borrow_mut().handle_vel_ned(msg); + tabs.solution.lock().unwrap().handle_vel_ned(msg); }); link.register_cb(|msg: MsgVelNED| { // why does this tab not take both VelNED messages? - tabs.solution_velocity.borrow_mut().handle_vel_ned(msg); + tabs.solution_velocity.lock().unwrap().handle_vel_ned(msg); }); link.register_cb(|msg: MsgUtcTime| { - tabs.baseline.borrow_mut().handle_utc_time(msg.clone()); - tabs.solution.borrow_mut().handle_utc_time(msg); + tabs.baseline.lock().unwrap().handle_utc_time(msg.clone()); + tabs.solution.lock().unwrap().handle_utc_time(msg); }); link.register_cb(handle_log_msg); for (message, gps_time) in messages { if !shared_state.is_running() { - if let Err(e) = tabs.main.borrow_mut().end_csv_logging() { + if let Err(e) = tabs.main.lock().unwrap().end_csv_logging() { error!("Issue closing csv file, {}", e); } - tabs.main.borrow_mut().close_sbp(); + tabs.main.lock().unwrap().close_sbp(); break; } if shared_state.is_paused() { @@ -190,12 +208,15 @@ where sleep(Duration::from_millis(PAUSE_LOOP_SLEEP_DURATION_MS)); } } - tabs.main.borrow_mut().serialize_sbp(&message); - tabs.status_bar.borrow_mut().add_bytes(message.sbp_size()); + tabs.main.lock().unwrap().serialize_sbp(&message); + tabs.status_bar + .lock() + .unwrap() + .add_bytes(message.sbp_size()); let sent = link.send(&message, gps_time.clone()); if let RealtimeDelay::On = realtime_delay { if sent { - tabs.main.borrow_mut().realtime_delay(gps_time); + tabs.main.lock().unwrap().realtime_delay(gps_time); } else { debug!( "Message, {}, ignored for realtime delay.", diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 0de8d870b..18aa20096 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -50,9 +50,8 @@ use std::{ }, time::Instant, }; - -pub type Error = std::boxed::Box; -pub type Result = std::result::Result; +pub type Error = anyhow::Error; +pub type Result = anyhow::Result; pub type UtcDateTime = DateTime; /// Sends SBP messages to the connected device @@ -77,10 +76,10 @@ impl MsgSender { } } - pub fn send(&self, mut msg: SBP) -> sbp::Result<()> { + pub fn send(&self, mut msg: SBP) -> Result<()> { msg.set_sender_id(Self::SENDER_ID); let mut framed = self.inner.lock().expect(Self::LOCK_FAILURE); - framed.send(msg)?; + framed.send(msg).context("while sending a message")?; Ok(()) } } From 0f65b2303142826e28995b4785d05953b0e2fd7b Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Fri, 27 Aug 2021 11:06:56 -0700 Subject: [PATCH 5/6] lint --- console_backend/src/main_tab.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/console_backend/src/main_tab.rs b/console_backend/src/main_tab.rs index c93d252b0..7daed7b02 100644 --- a/console_backend/src/main_tab.rs +++ b/console_backend/src/main_tab.rs @@ -30,8 +30,8 @@ impl<'a, S: CapnProtoSender> MainTab { sbp_logger: None, last_gps_time: None, last_gps_update: Instant::now(), - client_sender: client_sender.clone(), - shared_state: shared_state.clone(), + client_sender, + shared_state, } } From 2d5314f3689465e34a64a2501ae10324bc6a6424 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Fri, 27 Aug 2021 11:16:59 -0700 Subject: [PATCH 6/6] lintz --- console_backend/src/broadcaster.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/console_backend/src/broadcaster.rs b/console_backend/src/broadcaster.rs index b0443cf46..bc5abc908 100644 --- a/console_backend/src/broadcaster.rs +++ b/console_backend/src/broadcaster.rs @@ -200,9 +200,8 @@ where E: Event, { fn run(&mut self, _event: E, time: MaybeGpsTime) { - match time { - Some(Ok(time)) => (self)(time), - _ => {} + if let Some(Ok(time)) = time { + (self)(time) } } }