diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d793c0a38..0dfa8a00e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -90,8 +90,11 @@ jobs: ${{ runner.os }}- - name: Set up python builder + shell: bash run: | cargo make setup-builder + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Run Backend Benchmarks (Windows) env: @@ -156,8 +159,11 @@ jobs: ${{ runner.os }}- - name: Set up python builder + shell: bash run: | cargo make setup-builder + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Run Checks run: | @@ -262,8 +268,11 @@ jobs: ${{ runner.os }}- - name: Set up python builder + shell: bash run: | cargo make setup-builder + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Build ${{ runner.os }} Binaries. env: diff --git a/Cargo.lock b/Cargo.lock index 1a9f12731..3ff9dff9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1525,9 +1525,9 @@ dependencies = [ [[package]] name = "sbp-settings" -version = "0.1.2" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52b61e692c1dfdbf437e0efd6596cdbd059818697177c866622d8a63fa9c1860" +checksum = "2ff2fb432986d2f8955bf741221af231bc30be7433d1b93fe0ceb1489b947762" dependencies = [ "bindgen", "cmake", diff --git a/Makefile.toml b/Makefile.toml index 01aabe0cd..02b710de2 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -147,6 +147,18 @@ Get-ChildItem .\console_backend -Recurse -Include @("*.egg-info", "*.dist-info") command = "${PYTHON}" args = ["-m", "swiftnav_console.main", "${@}"] +[tasks.start-console-gdb] +command = "rust-gdb" +args = [ + "-ex", + "run", + "--args", + "${PYTHON}", + "-m", + "swiftnav_console.main", + "${@}", +] + [tasks.qml-run] dependencies = ["generate-resources"] run_task = "start-console" @@ -161,6 +173,16 @@ dependencies = [ ] run_task = "start-console" +[tasks.gdb-run] +dependencies = [ + "copy-capnp", + "generate-resources", + "remove-egg-dist", + "store-version", + "install-backend", +] +run_task = "start-console-gdb" + [tasks.prod-run] dependencies = [ "copy-capnp", @@ -214,7 +236,6 @@ command = "${PYTHON}" args = ["setup.py", "bdist_wheel"] [tasks.get-get-pip] -condition = { files_not_exist = ["${WORKSPACE}/get-pip.py"] } script_runner = "@duckscript" script = ''' wget -O ./get-pip.py https://bootstrap.pypa.io/get-pip.py @@ -248,7 +269,6 @@ cm_run_task generate-resources ''' [tasks.prep-dist] -dependencies = ["get-standalone-py"] script_runner = "@duckscript" script = ''' cm_run_task copy-capnp @@ -502,7 +522,7 @@ args = ["clippy", "--all-targets", "--", "--deny", "warnings"] dependencies = ["store-version", "copy-capnp"] command = "cargo" cwd = "console_backend" -args = ["test", "--features", "tests", "--", "--nocapture"] +args = ["test", "--features", "tests", "${@}", "--", "--nocapture"] [tasks.rust-type-check] dependencies = ["store-version", "copy-capnp"] diff --git a/console_backend/Cargo.toml b/console_backend/Cargo.toml index fc5562a45..c74ff695b 100644 --- a/console_backend/Cargo.toml +++ b/console_backend/Cargo.toml @@ -40,8 +40,8 @@ minreq = { version = "2.4.2", features = ["https"] } regex = { version = "1.5.4" } semver = { version = "1" } rust-ini = "0.17.0" -sbp = { version = "4.0", features = ["json", "link", "swiftnav"] } -sbp-settings = "0.1" +sbp = { version = "4.0.2", features = ["json", "link", "swiftnav"] } +sbp-settings = "0.2" env_logger = { version = "0.9", optional = true } mimalloc = { version = "0.1", default-features = false } diff --git a/console_backend/benches/cpu_benches.rs b/console_backend/benches/cpu_benches.rs index 461e2cb47..1ed983fe4 100644 --- a/console_backend/benches/cpu_benches.rs +++ b/console_backend/benches/cpu_benches.rs @@ -12,7 +12,8 @@ use std::{ extern crate console_backend; use console_backend::{ - connection::Connection, + common_constants::ConnectionState, + connection::{Connection, ConnectionManager}, process_messages, shared_state::SharedState, types::{ClientSender, RealtimeDelay}, @@ -51,7 +52,8 @@ fn run_process_messages(file_in_name: &str, failure: bool) { assert!(iter_count > 0); }); { - let (client_send_, client_recv) = channel::unbounded::>(); + let (client_send, client_recv) = channel::unbounded::>(); + let client_send = ClientSender::new(client_send); client_recv_tx .send(client_recv) .expect("sending client recv handle should succeed"); @@ -59,15 +61,13 @@ fn run_process_messages(file_in_name: &str, failure: bool) { thread::sleep(time::Duration::from_millis(FAILURE_CASE_SLEEP_MILLIS)); } let shared_state = SharedState::new(); - let client_send = ClientSender::new(client_send_); - shared_state.set_running(true, client_send.clone()); shared_state.set_debug(true); - let conn = Connection::file( + let conn_manager = ConnectionManager::new(client_send, shared_state); + conn_manager.connect_to_file( file_in_name.into(), RealtimeDelay::Off, /*close_when_done=*/ true, ); - process_messages::process_messages(conn, shared_state, client_send).unwrap(); } recv_thread.join().expect("join should succeed"); } diff --git a/console_backend/src/bin/headless-console.rs b/console_backend/src/bin/headless-console.rs index bd4191746..3ceb1b4fd 100644 --- a/console_backend/src/bin/headless-console.rs +++ b/console_backend/src/bin/headless-console.rs @@ -2,7 +2,7 @@ use anyhow::Result; use chrono::prelude::*; use console_backend::{ cli_options::{handle_cli, CliOptions}, - connection::ConnectionState, + connection::ConnectionManager, log_panel::setup_logging, server_recv_thread::server_recv_thread, shared_state::SharedState, @@ -30,11 +30,11 @@ Usage: let client_send = ClientSender::new(client_send_); setup_logging(client_send.clone(), true); let shared_state = SharedState::new(); - let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); - handle_cli(opt, &connection_state, shared_state.clone()); + let conn_manager = ConnectionManager::new(client_send.clone(), shared_state.clone()); + handle_cli(opt, &conn_manager, shared_state.clone()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); refresh_loggingbar(&mut client_send.clone(), shared_state.clone()); - server_recv_thread(connection_state, client_send, server_recv, shared_state); + server_recv_thread(conn_manager, client_send, server_recv, shared_state); let mut msg_count: usize = 0; while client_recv.recv().is_ok() { diff --git a/console_backend/src/cli_options.rs b/console_backend/src/cli_options.rs index 3a1cc1b05..fda4954a4 100644 --- a/console_backend/src/cli_options.rs +++ b/console_backend/src/cli_options.rs @@ -16,7 +16,7 @@ use crate::shared_state::SharedState; use crate::types::{FlowControl, RealtimeDelay}; use crate::{ common_constants::{SbpLogging, Tabs}, - connection::{Connection, ConnectionState}, + connection::{Connection, ConnectionManager}, }; #[derive(Debug)] @@ -262,18 +262,18 @@ fn is_baudrate(br: &str) -> Result<(), String> { /// /// # Parameters /// - `opt`: CLI Options to start specific connection type. -/// - `connection_state`: The Server state to start a specific connection. +/// - `conn_manager`: The Server state to start a specific connection. /// - `client_send`: Client Sender channel for communication from backend to frontend. /// - `shared_state`: The shared state for validating another connection is not already running. -pub fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state: SharedState) { +pub fn handle_cli(opt: CliOptions, conn_manager: &ConnectionManager, shared_state: SharedState) { if let Some(opt_input) = opt.input { match opt_input { Input::Tcp { host, port } => { - connection_state.connect_to_host(host, port); + conn_manager.connect_to_host(host, port); } Input::File { file_in } => { let filename = file_in.display().to_string(); - connection_state.connect_to_file(filename, RealtimeDelay::On, opt.exit_after); + conn_manager.connect_to_file(filename, RealtimeDelay::On, opt.exit_after); } Input::Serial { serialport, @@ -281,7 +281,7 @@ pub fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_st flow_control, } => { let serialport = serialport.display().to_string(); - connection_state.connect_to_serial(serialport, baudrate, flow_control); + conn_manager.connect_to_serial(serialport, baudrate, flow_control); } } } diff --git a/console_backend/src/common_constants.rs b/console_backend/src/common_constants.rs index 1bf90480b..ae2ae2ece 100644 --- a/console_backend/src/common_constants.rs +++ b/console_backend/src/common_constants.rs @@ -157,8 +157,8 @@ pub enum Keys { PREVIOUS_PORTS, #[strum(serialize = "PREVIOUS_FILES")] PREVIOUS_FILES, - #[strum(serialize = "CONNECTED")] - CONNECTED, + #[strum(serialize = "CONNECTION_STATE")] + CONNECTION_STATE, #[strum(serialize = "PORT")] PORT, #[strum(serialize = "POS")] @@ -282,9 +282,9 @@ pub enum Keys { } #[derive(Clone, Debug, Display, EnumString, EnumVariantNames, Eq, Hash, PartialEq)] -pub enum ApplicationStates { - #[strum(serialize = "CLOSE")] - CLOSE, +pub enum ConnectionState { + #[strum(serialize = "CLOSED")] + CLOSED, #[strum(serialize = "CONNECTED")] CONNECTED, #[strum(serialize = "DISCONNECTED")] diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs index 44723dc69..9025eeede 100644 --- a/console_backend/src/connection.rs +++ b/console_backend/src/connection.rs @@ -1,10 +1,11 @@ +use crate::common_constants as cc; use crate::constants::*; use crate::errors::*; -use crate::process_messages::process_messages; +use crate::process_messages::{process_messages, Messages, StopToken}; use crate::shared_state::SharedState; use crate::types::*; +use crate::watch::Watched; use anyhow::anyhow; -use crossbeam::channel::{unbounded, Receiver, Sender}; use log::{error, info}; use std::{ fmt::Debug, @@ -17,17 +18,261 @@ use std::{ time::Duration, }; -#[derive(Clone)] +#[derive(Debug)] +pub struct ConnectionManager { + conn: Watched>, + handle: Option>, +} + +impl ConnectionManager { + pub fn new(client_send: ClientSender, shared_state: SharedState) -> ConnectionManager { + let conn = Watched::new(None); + let handle = Some(thread::spawn({ + let conn = conn.clone(); + move || ConnectionManager::start_thd(client_send, shared_state, conn) + })); + ConnectionManager { conn, handle } + } + + /// Helper function for attempting to open a file and process SBP messages from it. + /// + /// # Parameters + /// - `filename`: The path to the filename to be read for SBP messages. + pub fn connect_to_file( + &self, + filename: String, + realtime_delay: RealtimeDelay, + close_when_done: bool, + ) { + let conn = Connection::file(filename, realtime_delay, close_when_done); + self.conn.send(Some(conn)); + } + + /// Helper function for attempting to open a tcp connection and process SBP messages from it. + /// + /// # Parameters + /// - `host`: The host portion of the TCP stream to open. + /// - `port`: The port to be used to open a TCP stream. + pub fn connect_to_host(&self, host: String, port: u16) { + let conn = Connection::tcp(host, port); + self.conn.send(Some(conn)); + } + + /// Helper function for attempting to open a serial port and process SBP messages from it. + /// + /// # Parameters + /// - `device`: The string path corresponding to the serial device to connect with. + /// - `baudrate`: The baudrate to use when communicating with the serial device. + /// - `flow`: The flow control mode to use when communicating with the serial device. + pub fn connect_to_serial(&self, device: String, baudrate: u32, flow: FlowControl) { + let conn = Connection::serial(device, baudrate, flow); + self.conn.send(Some(conn)); + } + + /// Send disconnect signal to server state loop. + pub fn disconnect(&self) { + self.conn.send(None); + } + + fn start_thd( + mut client_send: ClientSender, + shared_state: SharedState, + conn_info: Watched>, + ) { + let mut pm_thd: Option> = None; + let join = |thd: &mut Option>| { + if let Some(thd) = thd.take() { + thd.join().expect("process_messages thread panicked"); + } + }; + let mut recv = conn_info.watch(); + + info!("Console started..."); + + while let Ok(conn) = recv.wait() { + let conn = match conn { + Some(conn) => conn, + None => { + shared_state.set_connection(ConnectionState::Disconnected, &mut client_send); + join(&mut pm_thd); + info!("Disconnected successfully."); + log::logger().flush(); + continue; + } + }; + + let ((stop_token, messages), msg_sender) = match conn.try_connect(Some(&shared_state)) { + Ok((r, w)) => (Messages::new(r), MsgSender::new(w)), + Err(err) => { + error!("Unable to connect: {}", err); + log::logger().flush(); + continue; + } + }; + + let conn_watch = conn_info.clone(); + let conn = conn.clone(); + let shared_state = shared_state.clone(); + let mut client_send = client_send.clone(); + + shared_state.set_connection( + ConnectionState::Connected { + conn: conn.clone(), + stop_token, + }, + &mut client_send, + ); + + pm_thd = Some(thread::spawn(move || { + process_messages( + messages, + msg_sender, + conn.clone(), + shared_state, + client_send, + ); + if conn.close_when_done() { + conn_watch.close(); + } + })); + } + + log::logger().flush(); + shared_state.set_connection(ConnectionState::Closed, &mut client_send); + join(&mut pm_thd); + } +} + +impl Drop for ConnectionManager { + fn drop(&mut self) { + // breaks the `while let Ok(conn) ...` loop + self.conn.close(); + if let Some(h) = self.handle.take() { + let _ = h.join(); + } + } +} + +#[derive(Debug, Clone)] +pub enum ConnectionState { + /// App is shut down + Closed, + + /// Running but disconnected + Disconnected, + + /// Running and connected + Connected { + conn: Connection, + stop_token: StopToken, + }, +} + +impl ConnectionState { + pub fn is_disconnected(&self) -> bool { + matches!(self, Self::Disconnected) + } + + pub fn is_connected(&self) -> bool { + matches!(self, Self::Connected { .. }) + } +} + +impl std::fmt::Display for ConnectionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ConnectionState::Closed => write!(f, "{}", cc::ConnectionState::CLOSED), + ConnectionState::Disconnected => write!(f, "{}", cc::ConnectionState::DISCONNECTED), + ConnectionState::Connected { .. } => { + write!(f, "{}", cc::ConnectionState::CONNECTED) + } + } + } +} + +#[derive(Debug, Clone)] +pub enum Connection { + Tcp(TcpConnection), + Serial(SerialConnection), + File(FileConnection), +} + +impl Connection { + pub fn tcp(host: String, port: u16) -> Self { + Connection::Tcp(TcpConnection::new(host, port)) + } + + pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Self { + Connection::Serial(SerialConnection::new(device, baudrate, flow)) + } + + pub fn file(filename: String, realtime_delay: RealtimeDelay, close_when_done: bool) -> Self { + Connection::File(FileConnection::new( + filename, + close_when_done, + realtime_delay, + )) + } + + pub fn name(&self) -> String { + match self { + Connection::Tcp(conn) => conn.name(), + Connection::File(conn) => conn.name(), + Connection::Serial(conn) => conn.name(), + } + } + + pub fn close_when_done(&self) -> bool { + match self { + Connection::File(conn) => conn.close_when_done(), + Connection::Tcp(_) | Connection::Serial(_) => false, + } + } + + pub fn realtime_delay(&self) -> RealtimeDelay { + match self { + Connection::File(conn) => conn.realtime_delay(), + Connection::Tcp(_) | Connection::Serial(_) => RealtimeDelay::Off, + } + } + + pub fn settings_enabled(&self) -> bool { + matches!(self, Connection::Tcp(_) | Connection::Serial(_)) + } + + pub fn is_serial(&self) -> bool { + matches!(self, Connection::Serial(_)) + } + + pub fn try_connect( + &self, + shared_state: Option<&SharedState>, + ) -> Result<(Box, Box)> { + match self { + Connection::Tcp(conn) => conn.clone().try_connect(shared_state), + Connection::File(conn) => conn.clone().try_connect(shared_state), + Connection::Serial(conn) => conn.clone().try_connect(shared_state), + } + } +} + +#[derive(Debug, Clone)] pub struct TcpConnection { name: String, host: String, port: u16, } + impl TcpConnection { fn new(host: String, port: u16) -> Self { let name = format!("{}:{}", host, port); Self { name, host, port } } + + fn name(&self) -> String { + self.name.clone() + } + fn socket_addrs(name: String) -> Result { let socket = &mut name.to_socket_addrs()?; if let Some(s) = socket.next() { @@ -36,12 +281,10 @@ impl TcpConnection { Err(anyhow!("{}", TCP_CONNECTION_PARSING_FAILURE)) } } - fn name(&self) -> String { - self.name.clone() - } + fn try_connect( self, - shared_state: Option, + shared_state: Option<&SharedState>, ) -> Result<(Box, Box)> { let socket = TcpConnection::socket_addrs(self.name.clone())?; let rdr = @@ -49,20 +292,21 @@ impl TcpConnection { rdr.set_read_timeout(Some(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)))?; let writer = rdr.try_clone()?; info!("Connected to tcp stream!"); - if let Some(shared_state_) = shared_state { - shared_state_.update_tcp_history(self.host, self.port); + if let Some(shared_state) = shared_state { + shared_state.update_tcp_history(self.host, self.port); } Ok((Box::new(rdr), Box::new(writer))) } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct SerialConnection { - pub name: String, - pub device: String, - pub baudrate: u32, - pub flow: FlowControl, + name: String, + device: String, + baudrate: u32, + flow: FlowControl, } + impl SerialConnection { fn new(device: String, baudrate: u32, flow: FlowControl) -> Self { Self { @@ -72,12 +316,14 @@ impl SerialConnection { flow, } } + fn name(&self) -> String { self.name.clone() } + fn try_connect( self, - _shared_state: Option, + _shared_state: Option<&SharedState>, ) -> Result<(Box, Box)> { let rdr = serialport::new(self.device.clone(), self.baudrate) .flow_control(*self.flow) @@ -94,13 +340,14 @@ impl SerialConnection { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct FileConnection { - pub name: String, - pub filepath: PathBuf, + name: String, + filepath: PathBuf, close_when_done: bool, realtime_delay: RealtimeDelay, } + impl FileConnection { fn new>( filepath: P, @@ -125,203 +372,29 @@ impl FileConnection { fn name(&self) -> String { self.name.clone() } + fn close_when_done(&self) -> bool { self.close_when_done } + fn realtime_delay(&self) -> RealtimeDelay { self.realtime_delay } + fn try_connect( self, - shared_state: Option, + shared_state: Option<&SharedState>, ) -> Result<(Box, Box)> { let rdr = fs::File::open(&self.filepath)?; let writer = io::sink(); info!("Opened file successfully!"); - if let Some(shared_state_) = shared_state { - shared_state_.update_file_history(self.filepath.to_string_lossy().to_string()); + if let Some(shared_state) = shared_state { + shared_state.update_file_history(self.filepath.to_string_lossy().to_string()); } Ok((Box::new(rdr), Box::new(writer))) } } -#[derive(Clone)] -pub enum Connection { - Tcp(TcpConnection), - File(FileConnection), - Serial(SerialConnection), -} -impl Connection { - pub fn tcp(host: String, port: u16) -> Self { - Connection::Tcp(TcpConnection::new(host, port)) - } - - pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Self { - Connection::Serial(SerialConnection::new(device, baudrate, flow)) - } - - pub fn file(filename: String, realtime_delay: RealtimeDelay, close_when_done: bool) -> Self { - Connection::File(FileConnection::new( - filename, - close_when_done, - realtime_delay, - )) - } - pub fn close_when_done(&self) -> bool { - match self { - Connection::File(conn) => conn.close_when_done(), - Connection::Tcp(_) | Connection::Serial(_) => false, - } - } - pub fn name(&self) -> String { - match self { - Connection::Tcp(conn) => conn.name(), - Connection::File(conn) => conn.name(), - Connection::Serial(conn) => conn.name(), - } - } - pub fn realtime_delay(&self) -> RealtimeDelay { - match self { - Connection::File(conn) => conn.realtime_delay(), - Connection::Tcp(_) | Connection::Serial(_) => RealtimeDelay::Off, - } - } - pub fn settings_enabled(&self) -> bool { - matches!(self, Connection::Tcp(_) | Connection::Serial(_)) - } - pub fn is_serial(&self) -> bool { - matches!(self, Connection::Serial(_)) - } - pub fn try_connect( - &self, - shared_state: Option, - ) -> Result<(Box, Box)> { - match self { - Connection::Tcp(conn) => conn.clone().try_connect(shared_state), - Connection::File(conn) => conn.clone().try_connect(shared_state), - Connection::Serial(conn) => conn.clone().try_connect(shared_state), - } - } -} - -#[derive(Debug)] -pub struct ConnectionState { - pub handler: Option>, - shared_state: SharedState, - sender: Sender>, - receiver: Receiver>, -} -impl ConnectionState { - pub fn new(client_send: ClientSender, shared_state: SharedState) -> ConnectionState { - let (sender, receiver) = unbounded(); - - ConnectionState { - handler: Some(ConnectionState::connect_thread( - client_send, - shared_state.clone(), - receiver.clone(), - )), - shared_state, - sender, - receiver, - } - } - - fn connect_thread( - client_send: ClientSender, - shared_state: SharedState, - receiver: Receiver>, - ) -> JoinHandle<()> { - thread::spawn(move || { - let mut conn = None; - info!("Console started..."); - while shared_state.clone().is_server_running() { - if let Ok(conn_option) = receiver.recv_timeout(Duration::from_secs_f64( - SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC, - )) { - match conn_option { - Some(conn_) => { - conn = Some(conn_); - } - None => { - conn = None; - info!("Disconnected successfully."); - } - } - } - if let Some(conn_) = conn.clone() { - if let Err(e) = - process_messages(conn_, shared_state.clone(), client_send.clone()) - { - error!("unable to process messages, {}", e); - } - if !shared_state.is_running() { - conn = None; - } - shared_state.set_running(false, client_send.clone()); - } - log::logger().flush(); - } - }) - } - - /// Helper function for attempting to open a file and process SBP messages from it. - /// - /// # Parameters - /// - `filename`: The path to the filename to be read for SBP messages. - pub fn connect_to_file( - &self, - filename: String, - realtime_delay: RealtimeDelay, - close_when_done: bool, - ) { - let conn = Connection::file(filename, realtime_delay, close_when_done); - self.connect(conn); - } - - /// Helper function for attempting to open a tcp connection and process SBP messages from it. - /// - /// # Parameters - /// - `host`: The host portion of the TCP stream to open. - /// - `port`: The port to be used to open a TCP stream. - pub fn connect_to_host(&self, host: String, port: u16) { - let conn = Connection::tcp(host, port); - self.connect(conn); - } - - /// Helper function for attempting to open a serial port and process SBP messages from it. - /// - /// # Parameters - /// - `device`: The string path corresponding to the serial device to connect with. - /// - `baudrate`: The baudrate to use when communicating with the serial device. - /// - `flow`: The flow control mode to use when communicating with the serial device. - pub fn connect_to_serial(&self, device: String, baudrate: u32, flow: FlowControl) { - let conn = Connection::serial(device, baudrate, flow); - self.connect(conn); - } - - /// Send disconnect signal to server state loop. - pub fn disconnect(&self, client_send: S) { - self.shared_state.set_running(false, client_send); - if let Err(err) = self.sender.try_send(None) { - error!("{}, {}", SERVER_STATE_DISCONNECT_FAILURE, err); - } - } - - /// Helper function to send connection object to server state loop. - fn connect(&self, conn: Connection) { - if let Err(err) = self.sender.try_send(Some(conn)) { - error!("{}, {}", SERVER_STATE_NEW_CONNECTION_FAILURE, err); - } - } -} - -impl Drop for ConnectionState { - fn drop(&mut self) { - self.shared_state.stop_server_running(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -335,8 +408,8 @@ mod tests { }; const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; - const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; - const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; + const SBP_FILE_SHORT_DURATION: Duration = Duration::from_millis(27100); + const DELAY_BEFORE_CHECKING_APP_STARTED: Duration = Duration::from_millis(1000); #[test] fn create_tcp() { @@ -399,89 +472,52 @@ mod tests { shared_state.set_debug(true); let (client_send_, client_receive) = channel::unbounded::>(); let client_send = ClientSender::new(client_send_); - let connection_state = ConnectionState::new(client_send, shared_state.clone()); + let conn_manager = ConnectionManager::new(client_send, shared_state.clone()); let filename = TEST_SHORT_FILEPATH.to_string(); receive_thread(client_receive); - assert!(!shared_state.is_running()); - connection_state.connect_to_file( + assert!(!shared_state.connection().is_connected()); + conn_manager.connect_to_file( filename, RealtimeDelay::On, /*close_when_done = */ true, ); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); + sleep(DELAY_BEFORE_CHECKING_APP_STARTED); + assert!(shared_state.connection().is_connected()); // TODO: [CPP-272] Reassess timing on pause unittest for Windows - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC + 1.0)); - assert!(!shared_state.is_running()); + sleep(SBP_FILE_SHORT_DURATION + Duration::from_secs(1)); + drop(conn_manager); + assert!(!shared_state.connection().is_connected()); restore_backup_file(bfilename); } #[test] #[serial] - fn pause_via_connect_to_file_test() { + fn disconnect_via_connect_to_file_test() { let bfilename = filename(); backup_file(bfilename.clone()); let shared_state = SharedState::new(); shared_state.set_debug(true); let (client_send_, client_receive) = channel::unbounded::>(); let client_send = ClientSender::new(client_send_); - let connection_state = ConnectionState::new(client_send, shared_state.clone()); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - connection_state.connect_to_file( + let conn_manager = ConnectionManager::new(client_send.clone(), shared_state.clone()); + let filename = TEST_FILEPATH.to_string(); + let expected_duration = Duration::from_secs(1); + let handle = receive_thread(client_receive); + assert!(!shared_state.connection().is_connected()); + conn_manager.connect_to_file( filename, RealtimeDelay::On, /*close_when_done = */ true, ); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - shared_state.set_paused(true); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(shared_state.is_running()); - shared_state.set_paused(false); - // TODO: [CPP-272] Reassess timing on pause unittest for Windows - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC + 1.0)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - } - - #[test] - #[serial] - fn disconnect_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - shared_state.set_debug(true); - let (client_send_, client_receive) = channel::unbounded::>(); - let client_send = ClientSender::new(client_send_); - let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); - let filename = TEST_FILEPATH.to_string(); - let expected_duration = Duration::from_secs_f64(SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC) - + Duration::from_millis(100); - let handle = receive_thread(client_receive); - assert!(!shared_state.is_running()); - { - connection_state.connect_to_file( - filename, - RealtimeDelay::On, - /*close_when_done = */ true, - ); - } - - sleep(Duration::from_millis(5)); - assert!(shared_state.is_running()); + sleep(Duration::from_millis(50)); + assert!(shared_state.connection().is_connected()); let now = SystemTime::now(); sleep(Duration::from_millis(1)); - connection_state.disconnect(client_send.clone()); + conn_manager.disconnect(); sleep(Duration::from_millis(10)); - assert!(!shared_state.is_running()); - shared_state.stop_server_running(); + assert!(!shared_state.connection().is_connected()); drop(client_send); + drop(conn_manager); assert!(handle.join().is_ok()); match now.elapsed() { diff --git a/console_backend/src/constants.rs b/console_backend/src/constants.rs index 2eaabf6dd..ab2924d51 100644 --- a/console_backend/src/constants.rs +++ b/console_backend/src/constants.rs @@ -8,9 +8,6 @@ pub(crate) const APPLICATION_ORGANIZATION: &str = "swift-nav"; pub(crate) const APPLICATION_NAME: &str = "swift_navigation_console"; // CLI constants. -// Process Message constants. -pub(crate) const PAUSE_LOOP_SLEEP_DURATION_MS: u64 = 100; - // Logging constants #[allow(dead_code)] pub(crate) const LOG_WRITER_BUFFER_MESSAGE_COUNT: usize = 50; @@ -248,5 +245,3 @@ pub(crate) const UNKNOWN_ERROR: &str = "Unk Error"; pub(crate) const UNKNOWN_ERROR_SHORT: &str = "Unk"; pub(crate) const ODO_POSTFIX: &str = "+Odo"; pub(crate) const INS_POSTFIX: &str = "+INS"; - -pub(crate) const SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC: f64 = 1.0; diff --git a/console_backend/src/errors.rs b/console_backend/src/errors.rs index a335f1f80..a95cb1c42 100644 --- a/console_backend/src/errors.rs +++ b/console_backend/src/errors.rs @@ -10,8 +10,6 @@ pub(crate) const UNABLE_TO_SEND_INS_UPDATE_FAILURE: &str = "unable to send an in pub(crate) const THREAD_JOIN_FAILURE: &str = "thread join failure"; pub(crate) const TCP_CONNECTION_PARSING_FAILURE: &str = "unable to parse the provided string for ip string"; -pub(crate) const SERVER_STATE_NEW_CONNECTION_FAILURE: &str = "server state new connection failure"; -pub(crate) const SERVER_STATE_DISCONNECT_FAILURE: &str = "server state disconnect failure"; pub(crate) const CONSOLE_LOG_JSON_TO_STRING_FAILURE: &str = "unable to convert json to string"; pub(crate) const CROSSBEAM_SCOPE_UNWRAP_FAILURE: &str = "unable to unwrap crossbeam scope"; pub(crate) const UNABLE_TO_CLONE_UPDATE_SHARED: &str = "unable to clone update shared"; diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index 89429b772..43b64e1d5 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -39,6 +39,7 @@ pub mod types; pub mod update_downloader; pub mod update_tab; pub mod utils; +pub mod watch; use std::sync::Mutex; @@ -47,8 +48,8 @@ use crate::{ advanced_networking_tab::AdvancedNetworkingTab, advanced_spectrum_analyzer_tab::AdvancedSpectrumAnalyzerTab, advanced_system_monitor_tab::AdvancedSystemMonitorTab, baseline_tab::BaselineTab, - main_tab::MainTab, observation_tab::ObservationTab, settings_tab::SettingsTab, - solution_tab::SolutionTab, solution_velocity_tab::SolutionVelocityTab, status_bar::StatusBar, + main_tab::MainTab, observation_tab::ObservationTab, solution_tab::SolutionTab, + solution_velocity_tab::SolutionVelocityTab, status_bar::StatusBar, tracking_signals_tab::TrackingSignalsTab, tracking_sky_plot_tab::TrackingSkyPlotTab, update_tab::UpdateTab, }; @@ -56,7 +57,7 @@ use crate::{ #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -struct Tabs<'link, S: types::CapnProtoSender> { +struct Tabs { pub main: Mutex>, pub advanced_ins: Mutex>, pub advanced_magnetometer: Mutex>, @@ -71,15 +72,13 @@ struct Tabs<'link, S: types::CapnProtoSender> { pub advanced_spectrum_analyzer: Mutex>, pub status_bar: Mutex>, pub update: Mutex, - pub settings_tab: Mutex>, } -impl<'link, S: types::CapnProtoSender> Tabs<'link, S> { +impl Tabs { fn new( shared_state: shared_state::SharedState, client_sender: S, msg_sender: types::MsgSender, - link: sbp::link::Link<'link, ()>, ) -> Self { Self { main: MainTab::new(shared_state.clone(), client_sender.clone()).into(), @@ -101,12 +100,8 @@ impl<'link, S: types::CapnProtoSender> Tabs<'link, S> { msg_sender.clone(), ) .into(), - baseline: BaselineTab::new( - shared_state.clone(), - client_sender.clone(), - msg_sender.clone(), - ) - .into(), + baseline: BaselineTab::new(shared_state.clone(), client_sender.clone(), msg_sender) + .into(), tracking_signals: TrackingSignalsTab::new(shared_state.clone(), client_sender.clone()) .into(), tracking_sky_plot: TrackingSkyPlotTab::new(client_sender.clone(), shared_state.clone()) @@ -123,9 +118,8 @@ impl<'link, S: types::CapnProtoSender> Tabs<'link, S> { client_sender.clone(), ) .into(), - status_bar: StatusBar::new(shared_state.clone(), client_sender.clone()).into(), - update: UpdateTab::new(shared_state.clone()).into(), - settings_tab: SettingsTab::new(shared_state, client_sender, msg_sender, link).into(), + status_bar: StatusBar::new(shared_state.clone(), client_sender).into(), + update: UpdateTab::new(shared_state).into(), } } } diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index b297998fd..8999132fd 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -1,6 +1,6 @@ -use std::{io::ErrorKind, thread::sleep, time::Duration}; +use std::{io, sync::Arc, thread}; -use crossbeam::sync::Parker; +use crossbeam::channel::{Receiver, Sender}; use log::{debug, error}; use sbp::{ link::LinkSource, @@ -17,62 +17,37 @@ use sbp::{ }, tracking::{MsgMeasurementState, MsgTrackingState}, }, - sbp_iter_ext::{ControlFlow, SbpIterExt}, - SbpMessage, + SbpIterExt, SbpMessage, }; -use crate::constants::PAUSE_LOOP_SLEEP_DURATION_MS; -use crate::errors::UNABLE_TO_CLONE_UPDATE_SHARED; -use crate::log_panel::handle_log_msg; -use crate::shared_state::SharedState; +use crate::types::UartState; use crate::types::{ BaselineNED, CapnProtoSender, Dops, GpsTime, MsgSender, ObservationMsg, PosLLH, RealtimeDelay, - Result, Specan, VelNED, + Specan, VelNED, }; use crate::update_tab; -use crate::utils::{close_frontend, refresh_navbar}; +use crate::utils::refresh_navbar; use crate::Tabs; -use crate::{connection::Connection, types::UartState}; +use crate::{connection::Connection, shared_state::SharedState}; +use crate::{errors::UNABLE_TO_CLONE_UPDATE_SHARED, settings_tab}; +use crate::{log_panel::handle_log_msg, settings_tab::SettingsTab}; pub fn process_messages( + messages: Messages, + msg_sender: MsgSender, conn: Connection, shared_state: SharedState, - mut client_send: S, -) -> Result<()> -where + client_send: S, +) where S: CapnProtoSender, { - shared_state.set_running(true, client_send.clone()); - shared_state.set_settings_refresh(conn.settings_enabled()); - let realtime_delay = conn.realtime_delay(); - let (rdr, writer) = conn.try_connect(Some(shared_state.clone()))?; - let msg_sender = MsgSender::new(writer); - shared_state.set_current_connection(conn.name()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); - let messages = { - let state = shared_state.clone(); - let client = client_send.clone(); - sbp::iter_messages(rdr) - .handle_errors(move |e| { - debug!("{}", e); - match e { - sbp::DeserializeError::IoError(err) => { - if (*err).kind() == ErrorKind::TimedOut { - state.set_running(false, client.clone()); - } - ControlFlow::Break - } - _ => ControlFlow::Continue, - } - }) - .with_rover_time() - }; + let source: LinkSource> = LinkSource::new(); let tabs = Tabs::new( shared_state.clone(), client_send.clone(), msg_sender.clone(), - source.stateless_link(), ); let link = source.link(); @@ -276,49 +251,37 @@ where .expect(UNABLE_TO_CLONE_UPDATE_SHARED) .clone_update_tab_context(); update_tab_context.set_serial_prompt(conn.is_serial()); - let client_send_clone = client_send.clone(); let (update_tab_tx, update_tab_rx) = tabs.update.lock().unwrap().clone_channel(); - let settings_parker = Parker::new(); - let settings_unparker = settings_parker.unparker().clone(); + let settings_tab = conn.settings_enabled().then(|| { + SettingsTab::new( + shared_state.clone(), + client_send.clone(), + msg_sender.clone(), + source.stateless_link(), + ) + }); crossbeam::scope(|scope| { - let settings_tab = &tabs.settings_tab; - let settings_shared_state = shared_state.clone(); - scope.spawn(move |_| { - while settings_shared_state.is_running() { - settings_tab.lock().unwrap().tick(); - settings_parker.park(); - } - }); scope.spawn(|_| { update_tab::update_tab_thread( update_tab_tx.clone(), update_tab_rx, update_tab_context, shared_state.clone(), - client_send_clone, + client_send.clone(), source.stateless_link(), msg_sender.clone(), ); }); + + if conn.settings_enabled() { + scope.spawn(|_| { + let tab = settings_tab.as_ref().unwrap(); + shared_state.set_settings_refresh(true); + settings_tab::start_thd(tab); + }); + } + for (message, gps_time) in messages { - if shared_state.settings_needs_update() { - settings_unparker.unpark(); - } - if !shared_state.is_running() { - if let Err(e) = tabs.main.lock().unwrap().end_csv_logging() { - error!("Issue closing csv file, {}", e); - } - tabs.main.lock().unwrap().close_sbp(); - break; - } - if shared_state.is_paused() { - loop { - if !shared_state.is_paused() { - break; - } - sleep(Duration::from_millis(PAUSE_LOOP_SLEEP_DURATION_MS)); - } - } let sent = source.send_with_state(&tabs, &message); tabs.main.lock().unwrap().serialize_sbp(&message); tabs.status_bar @@ -329,7 +292,7 @@ where .lock() .unwrap() .handle_sbp(&message); - if let RealtimeDelay::On = realtime_delay { + if let RealtimeDelay::On = conn.realtime_delay() { if sent { tabs.main.lock().unwrap().realtime_delay(gps_time); } else { @@ -341,16 +304,90 @@ where } log::logger().flush(); } + if let Some(ref tab) = settings_tab { + tab.stop() + } if let Err(err) = update_tab_tx.send(None) { error!("Issue stopping update tab: {}", err); } - if conn.close_when_done() { - shared_state.set_running(false, client_send.clone()); - close_frontend(&mut client_send); + if let Err(e) = tabs.main.lock().unwrap().end_csv_logging() { + error!("Issue closing csv file, {}", e); } - settings_unparker.unpark(); + tabs.main.lock().unwrap().close_sbp(); }) .unwrap(); +} + +type Message = ( + sbp::Sbp, + Option>, +); - Ok(()) +/// Wrapper around the iterator of messages that enables other threads +/// to stop the iterator. +pub struct Messages { + messages: Receiver, + canceled: Receiver<()>, +} + +impl Messages { + pub fn new(reader: R) -> (StopToken, Self) + where + R: io::Read + Send + 'static, + { + let (sink, messages) = crossbeam::channel::bounded(100); + let (cancel, canceled) = crossbeam::channel::bounded(1); + thread::spawn(move || Messages::start_thd(reader, sink)); + (StopToken(Arc::new(cancel)), Self { messages, canceled }) + } + + fn start_thd(reader: R, sink: Sender) { + for message in sbp::iter_messages(reader) + .log_errors(log::Level::Debug) + .with_rover_time() + { + // this will error after `Messages` is dropped which will + // stop this thread + if sink.send(message).is_err() { + break; + } + } + } +} + +impl Iterator for Messages { + type Item = Message; + + fn next(&mut self) -> Option { + crossbeam::select! { + recv(self.canceled) -> _ => None, + recv(self.messages) -> msg => msg.ok(), + } + } +} + +/// Used to break the `process_messages` loop. Can be stopped manually +/// or will automatically stop after all copies of this have been dropped. +#[derive(Debug)] +pub struct StopToken(Arc>); + +impl StopToken { + pub fn stop(&self) { + let _ = self.0.try_send(()); + } +} + +impl Clone for StopToken { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl Drop for StopToken { + fn drop(&mut self) { + // if this is the last one make sure we stop the thread + if Arc::strong_count(&self.0) == 1 { + self.stop(); + } + } } diff --git a/console_backend/src/server.rs b/console_backend/src/server.rs index c261d4856..e582df068 100644 --- a/console_backend/src/server.rs +++ b/console_backend/src/server.rs @@ -6,7 +6,7 @@ use pyo3::types::PyBytes; use std::time; use crate::cli_options::*; -use crate::connection::ConnectionState; +use crate::connection::ConnectionManager; use crate::log_panel::setup_logging; use crate::server_recv_thread::server_recv_thread; @@ -113,12 +113,12 @@ impl Server { let opt = CliOptions::from_filtered_cli(); let shared_state = SharedState::new(); - let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); + let conn_manager = ConnectionManager::new(client_send.clone(), shared_state.clone()); // Handle CLI Opts. - handle_cli(opt, &connection_state, shared_state.clone()); + handle_cli(opt, &conn_manager, shared_state.clone()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); refresh_loggingbar(&mut client_send.clone(), shared_state.clone()); - server_recv_thread(connection_state, client_send, server_recv, shared_state); + server_recv_thread(conn_manager, client_send, server_recv, shared_state); Ok(server_endpoint) } } diff --git a/console_backend/src/server_recv_thread.rs b/console_backend/src/server_recv_thread.rs index 26144b202..8abe12fd2 100644 --- a/console_backend/src/server_recv_thread.rs +++ b/console_backend/src/server_recv_thread.rs @@ -1,5 +1,5 @@ use crate::common_constants::SbpLogging; -use crate::connection::ConnectionState; +use crate::connection::ConnectionManager; use crate::console_backend_capnp as m; use crate::errors::{ CAP_N_PROTO_DESERIALIZATION_FAILURE, CONVERT_TO_STR_FAILURE, SHARED_STATE_LOCK_MUTEX_FAILURE, @@ -17,354 +17,323 @@ use capnp::serialize; use chrono::{DateTime, Utc}; use crossbeam::channel; use log::{error, info}; -use std::{ - io::{BufReader, Cursor}, - path::PathBuf, - str::FromStr, - thread, -}; +use std::{io::Cursor, path::PathBuf, str::FromStr, thread}; pub type Error = anyhow::Error; pub type Result = anyhow::Result; pub type UtcDateTime = DateTime; pub fn server_recv_thread( - connection_state: ConnectionState, + conn_manager: ConnectionManager, client_send: ClientSender, server_recv: channel::Receiver>, shared_state: SharedState, ) { thread::spawn(move || { - let client_send_clone = client_send.clone(); loop { log::logger().flush(); - let buf = server_recv.recv(); - if let Ok(buf) = buf { - let mut buf_reader = BufReader::new(Cursor::new(buf)); - let message_reader = serialize::read_message( - &mut buf_reader, - ::capnp::message::ReaderOptions::new(), - ) - .unwrap(); - let message = message_reader - .get_root::() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let message = match message.which() { - Ok(msg) => msg, - Err(e) => { - error!("error reading message: {}", e); - continue; - } - }; - let shared_state_clone = shared_state.clone(); - match message { - m::message::SerialRefreshRequest(Ok(_)) => { - refresh_navbar(&mut client_send_clone.clone(), shared_state_clone); - } - m::message::DisconnectRequest(Ok(_)) => { - connection_state.disconnect(client_send_clone.clone()); - } - m::message::FileRequest(Ok(req)) => { - let filename = req - .get_filename() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let filename = filename.to_string(); - connection_state.connect_to_file( - filename, - RealtimeDelay::On, - /*close_when_done*/ false, - ); - } - m::message::PauseRequest(Ok(_)) => { - if shared_state_clone.is_paused() { - shared_state_clone.set_paused(false); - } else { - shared_state_clone.set_paused(true); - } - } - m::message::TcpRequest(Ok(req)) => { - let host = req.get_host().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let port = req.get_port(); - connection_state.connect_to_host(host.to_string(), port); - } - m::message::SerialRequest(Ok(req)) => { - let device = req.get_device().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let device = device.to_string(); - let baudrate = req.get_baudrate(); - let flow = req.get_flow_control().unwrap(); - let flow = FlowControl::from_str(flow).unwrap(); - connection_state.connect_to_serial(device, baudrate, flow); - } - m::message::TrackingSignalsStatusFront(Ok(cv_in)) => { - let check_visibility = cv_in - .get_tracking_signals_check_visibility() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let check_visibility: Vec = check_visibility - .iter() - .map(|x| String::from(x.unwrap())) - .collect(); - let shared_state_clone = shared_state.clone(); - { - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).tracking_tab.signals_tab.check_visibility = - check_visibility; - } - } - m::message::LoggingBarFront(Ok(cv_in)) => { - let directory = cv_in - .get_directory() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - shared_state.set_logging_directory(PathBuf::from(directory)); - let shared_state_clone = shared_state.clone(); - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).logging_bar.csv_logging = - CsvLogging::from(cv_in.get_csv_logging()); - let sbp_logging = cv_in - .get_sbp_logging() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - (*shared_data).logging_bar.sbp_logging = - SbpLogging::from_str(sbp_logging).expect(CONVERT_TO_STR_FAILURE); + let mut reader = match server_recv.recv() { + Ok(buf) => Cursor::new(buf), + Err(_) => break, + }; + let message_reader = + serialize::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap(); + let message = message_reader + .get_root::() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let message = match message.which() { + Ok(msg) => msg, + Err(e) => { + error!("error reading message: {}", e); + continue; + } + }; + match message { + m::message::SerialRefreshRequest(Ok(_)) => { + refresh_navbar(&mut client_send.clone(), shared_state.clone()); + } + m::message::DisconnectRequest(Ok(_)) => { + conn_manager.disconnect(); + } + m::message::FileRequest(Ok(req)) => { + let filename = req + .get_filename() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let filename = filename.to_string(); + conn_manager.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done*/ false, + ); + } + m::message::TcpRequest(Ok(req)) => { + let host = req.get_host().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let port = req.get_port(); + conn_manager.connect_to_host(host.to_string(), port); + } + m::message::SerialRequest(Ok(req)) => { + let device = req.get_device().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let device = device.to_string(); + let baudrate = req.get_baudrate(); + let flow = req.get_flow_control().unwrap(); + let flow = FlowControl::from_str(flow).unwrap(); + conn_manager.connect_to_serial(device, baudrate, flow); + } + m::message::TrackingSignalsStatusFront(Ok(cv_in)) => { + let check_visibility = cv_in + .get_tracking_signals_check_visibility() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let check_visibility: Vec = check_visibility + .iter() + .map(|x| String::from(x.unwrap())) + .collect(); + let shared_state = shared_state.clone(); + { + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).tracking_tab.signals_tab.check_visibility = check_visibility; } - m::message::LogLevelFront(Ok(cv_in)) => { - let shared_state_clone = shared_state.clone(); - let log_level = cv_in - .get_log_level() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let log_level = - LogLevel::from_str(log_level).expect(CONVERT_TO_STR_FAILURE); - info!("Log Level: {}", log_level); - shared_state_clone.set_log_level(log_level); - refresh_navbar(&mut client_send.clone(), shared_state.clone()); + } + m::message::LoggingBarFront(Ok(cv_in)) => { + let directory = cv_in + .get_directory() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + shared_state.set_logging_directory(PathBuf::from(directory)); + let shared_state = shared_state.clone(); + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).logging_bar.csv_logging = + CsvLogging::from(cv_in.get_csv_logging()); + let sbp_logging = cv_in + .get_sbp_logging() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + (*shared_data).logging_bar.sbp_logging = + SbpLogging::from_str(sbp_logging).expect(CONVERT_TO_STR_FAILURE); + } + m::message::LogLevelFront(Ok(cv_in)) => { + let shared_state = shared_state.clone(); + let log_level = cv_in + .get_log_level() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let log_level = LogLevel::from_str(log_level).expect(CONVERT_TO_STR_FAILURE); + info!("Log Level: {}", log_level); + shared_state.set_log_level(log_level); + refresh_navbar(&mut client_send.clone(), shared_state.clone()); + } + m::message::SolutionVelocityStatusFront(Ok(cv_in)) => { + let unit = cv_in + .get_solution_velocity_unit() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let shared_state = shared_state.clone(); + { + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).solution_tab.velocity_tab.unit = unit.to_string(); } - m::message::SolutionVelocityStatusFront(Ok(cv_in)) => { - let unit = cv_in - .get_solution_velocity_unit() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let shared_state_clone = shared_state.clone(); + } + m::message::SolutionPositionStatusUnitFront(Ok(cv_in)) => { + let shared_state = shared_state.clone(); + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let unit = cv_in + .get_solution_position_unit() + .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + (*shared_data).solution_tab.position_tab.unit = Some( + LatLonUnits::from_str(unit) + .expect(SOLUTION_POSITION_UNIT_SELECTION_NOT_AVAILABLE), + ); + } + m::message::SolutionPositionStatusButtonFront(Ok(cv_in)) => { + let shared_state = shared_state.clone(); + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).solution_tab.position_tab.clear = + cv_in.get_solution_position_clear(); + (*shared_data).solution_tab.position_tab.pause = + cv_in.get_solution_position_pause(); + } + m::message::BaselinePlotStatusButtonFront(Ok(cv_in)) => { + let shared_state = shared_state.clone(); + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).baseline_tab.clear = cv_in.get_clear(); + (*shared_data).baseline_tab.pause = cv_in.get_pause(); + (*shared_data).baseline_tab.reset = cv_in.get_reset_filters(); + } + m::message::AdvancedSpectrumAnalyzerStatusFront(Ok(cv_in)) => { + let shared_state = shared_state.clone(); + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).advanced_spectrum_analyzer_tab.channel_idx = cv_in.get_channel(); + } + m::message::UpdateTabStatusFront(Ok(cv_in)) => { + if let Some(update_tab_sender) = shared_state.update_tab_sender() { + let download_latest_firmware = cv_in.get_download_latest_firmware(); + let update_firmware = cv_in.get_update_firmware(); + let send_file_to_device = cv_in.get_send_file_to_device(); + let serial_prompt_confirm = cv_in.get_serial_prompt_confirm(); + let firmware_directory = match cv_in.get_download_directory().which() { + Ok(m::update_tab_status_front::download_directory::Directory(Ok( + directory, + ))) => Some(PathBuf::from(directory)), + Err(e) => { + error!("{}", e); + None + } + _ => None, + }; + let firmware_local_filepath = match cv_in + .get_update_local_filepath() + .which() { - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).solution_tab.velocity_tab.unit = unit.to_string(); - } - } - m::message::SolutionPositionStatusUnitFront(Ok(cv_in)) => { - let shared_state_clone = shared_state.clone(); - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - let unit = cv_in - .get_solution_position_unit() - .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - (*shared_data).solution_tab.position_tab.unit = Some( - LatLonUnits::from_str(unit) - .expect(SOLUTION_POSITION_UNIT_SELECTION_NOT_AVAILABLE), - ); - } - m::message::SolutionPositionStatusButtonFront(Ok(cv_in)) => { - let shared_state_clone = shared_state.clone(); - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).solution_tab.position_tab.clear = - cv_in.get_solution_position_clear(); - (*shared_data).solution_tab.position_tab.pause = - cv_in.get_solution_position_pause(); - } - m::message::BaselinePlotStatusButtonFront(Ok(cv_in)) => { - let shared_state_clone = shared_state.clone(); - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).baseline_tab.clear = cv_in.get_clear(); - (*shared_data).baseline_tab.pause = cv_in.get_pause(); - (*shared_data).baseline_tab.reset = cv_in.get_reset_filters(); - } - m::message::AdvancedSpectrumAnalyzerStatusFront(Ok(cv_in)) => { - let shared_state_clone = shared_state.clone(); - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).advanced_spectrum_analyzer_tab.channel_idx = - cv_in.get_channel(); - } - m::message::UpdateTabStatusFront(Ok(cv_in)) => { - if let Some(update_tab_sender) = shared_state.update_tab_sender() { - let download_latest_firmware = cv_in.get_download_latest_firmware(); - let update_firmware = cv_in.get_update_firmware(); - let send_file_to_device = cv_in.get_send_file_to_device(); - let serial_prompt_confirm = cv_in.get_serial_prompt_confirm(); - let firmware_directory = match cv_in.get_download_directory().which() { - Ok(m::update_tab_status_front::download_directory::Directory( - Ok(directory), - )) => Some(PathBuf::from(directory)), - Err(e) => { - error!("{}", e); - None - } - _ => None, - }; - let firmware_local_filepath = - match cv_in.get_update_local_filepath().which() { - Ok( - m::update_tab_status_front::update_local_filepath::Filepath( - Ok(filepath), - ), - ) => Some(PathBuf::from(filepath)), - Err(e) => { - error!("{}", e); - None - } - _ => None, - }; - let firmware_local_filename = - match cv_in.get_update_local_filename().which() { - Ok( - m::update_tab_status_front::update_local_filename::Filepath( - Ok(filepath), - ), - ) => Some(PathBuf::from(filepath)), - Err(e) => { - error!("{}", e); - None - } - _ => None, - }; - let fileio_local_filepath = - match cv_in.get_fileio_local_filepath().which() { - Ok( - m::update_tab_status_front::fileio_local_filepath::Filepath( - Ok(filepath), - ), - ) => Some(PathBuf::from(filepath)), - Err(e) => { - error!("{}", e); - None - } - _ => None, - }; - let fileio_destination_filepath = match cv_in.get_fileio_destination_filepath().which() { - Ok( - m::update_tab_status_front::fileio_destination_filepath::Filepath( - Ok(filepath), - ), - ) => { - Some(PathBuf::from(filepath)) - } - Err(e) => { - error!("{}", e); - None - } - _ => None, - }; - if let Err(err) = update_tab_sender.send(Some(UpdateTabUpdate { - download_latest_firmware, - update_firmware, - send_file_to_device, - firmware_directory, - firmware_local_filepath, - firmware_local_filename, - fileio_local_filepath, - fileio_destination_filepath, - serial_prompt_confirm, - })) { - error!("{}", err); + Ok(m::update_tab_status_front::update_local_filepath::Filepath( + Ok(filepath), + )) => Some(PathBuf::from(filepath)), + Err(e) => { + error!("{}", e); + None } - } - } - m::message::SettingsRefreshRequest(Ok(_)) => { - shared_state_clone.set_settings_refresh(true); - } - m::message::SettingsResetRequest(Ok(_)) => { - shared_state_clone.set_settings_reset(true); - } - m::message::SettingsSaveRequest(Ok(_)) => { - shared_state_clone.set_settings_save(true); - } - m::message::SettingsExportRequest(Ok(path)) => { - let path = path.get_path().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - shared_state_clone.set_export_settings(Some(PathBuf::from(path))); - } - m::message::SettingsImportRequest(Ok(path)) => { - let path = path.get_path().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - shared_state_clone.set_import_settings(Some(PathBuf::from(path))); - } - m::message::SettingsWriteRequest(Ok(req)) => { - let group = req.get_group().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let name = req.get_name().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let value = req.get_value().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); - let req = settings_tab::SaveRequest { - group: group.to_string(), - name: name.to_string(), - value: value.to_string(), + _ => None, }; - shared_state_clone.set_write_setting(Some(req)); - } - m::message::AdvancedSystemMonitorStatusFront(Ok(_)) => { - shared_state_clone.set_reset_device(true); - } - m::message::AdvancedNetworkingStatusFront(Ok(cv_in)) => { - let refresh = cv_in.get_refresh(); - let start = cv_in.get_start(); - let stop = cv_in.get_stop(); - let ip_address = match cv_in.get_ipv4_address().which() { - Ok(m::advanced_networking_status_front::ipv4_address::Address(Ok( - address, - ))) => Some(String::from(address)), + let firmware_local_filename = match cv_in + .get_update_local_filename() + .which() + { + Ok(m::update_tab_status_front::update_local_filename::Filepath( + Ok(filepath), + )) => Some(PathBuf::from(filepath)), Err(e) => { error!("{}", e); None } _ => None, }; - let port: Option = match cv_in.get_port().which() { - Ok(m::advanced_networking_status_front::port::Port(port)) => Some(port), + let fileio_local_filepath = match cv_in.get_fileio_local_filepath().which() + { + Ok(m::update_tab_status_front::fileio_local_filepath::Filepath( + Ok(filepath), + )) => Some(PathBuf::from(filepath)), Err(e) => { error!("{}", e); None } _ => None, }; - let all_messages: Option = match cv_in.get_all_messages().which() { - Ok(m::advanced_networking_status_front::all_messages::Toggle( - toggle, - )) => Some(toggle), + let fileio_destination_filepath = match cv_in + .get_fileio_destination_filepath() + .which() + { + Ok( + m::update_tab_status_front::fileio_destination_filepath::Filepath( + Ok(filepath), + ), + ) => Some(PathBuf::from(filepath)), Err(e) => { error!("{}", e); None } _ => None, }; - shared_state_clone.set_advanced_networking_update(AdvancedNetworkingState { - refresh, - start, - stop, - all_messages, - ip_address, - port, - }) - } - m::message::ConfirmInsChange(Ok(_)) => { - shared_state_clone.set_settings_confirm_ins_change(true); - } - m::message::AutoSurveyRequest(Ok(_)) => { - let mut shared_data = shared_state_clone - .lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.auto_survey_data.requested = true; - } - _ => { - error!("unknown message from front-end"); + if let Err(err) = update_tab_sender.send(Some(UpdateTabUpdate { + download_latest_firmware, + update_firmware, + send_file_to_device, + firmware_directory, + firmware_local_filepath, + firmware_local_filename, + fileio_local_filepath, + fileio_destination_filepath, + serial_prompt_confirm, + })) { + error!("{}", err); + } } } - } else { - break; + m::message::AutoSurveyRequest(Ok(_)) => { + let mut shared_data = + shared_state.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + shared_data.auto_survey_data.requested = true; + } + m::message::SettingsRefreshRequest(Ok(_)) => { + shared_state.set_settings_refresh(true); + } + m::message::SettingsResetRequest(Ok(_)) => { + shared_state.set_settings_reset(true); + } + m::message::SettingsSaveRequest(Ok(_)) => { + shared_state.set_settings_save(true); + } + m::message::SettingsExportRequest(Ok(path)) => { + let path = path.get_path().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + shared_state.set_export_settings(Some(PathBuf::from(path))); + } + m::message::SettingsImportRequest(Ok(path)) => { + let path = path.get_path().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + shared_state.set_import_settings(Some(PathBuf::from(path))); + } + m::message::SettingsWriteRequest(Ok(req)) => { + let group = req.get_group().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let name = req.get_name().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let value = req.get_value().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); + let req = settings_tab::SaveRequest { + group: group.to_string(), + name: name.to_string(), + value: value.to_string(), + }; + shared_state.set_write_setting(Some(req)); + } + m::message::AdvancedSystemMonitorStatusFront(Ok(_)) => { + shared_state.set_reset_device(true); + } + m::message::AdvancedNetworkingStatusFront(Ok(cv_in)) => { + let refresh = cv_in.get_refresh(); + let start = cv_in.get_start(); + let stop = cv_in.get_stop(); + let ip_address = match cv_in.get_ipv4_address().which() { + Ok(m::advanced_networking_status_front::ipv4_address::Address(Ok( + address, + ))) => Some(String::from(address)), + Err(e) => { + error!("{}", e); + None + } + _ => None, + }; + let port: Option = match cv_in.get_port().which() { + Ok(m::advanced_networking_status_front::port::Port(port)) => Some(port), + Err(e) => { + error!("{}", e); + None + } + _ => None, + }; + let all_messages: Option = match cv_in.get_all_messages().which() { + Ok(m::advanced_networking_status_front::all_messages::Toggle(toggle)) => { + Some(toggle) + } + Err(e) => { + error!("{}", e); + None + } + _ => None, + }; + shared_state.set_advanced_networking_update(AdvancedNetworkingState { + refresh, + start, + stop, + all_messages, + ip_address, + port, + }) + } + m::message::ConfirmInsChange(Ok(_)) => { + shared_state.set_settings_confirm_ins_change(true); + } + _ => { + error!("unknown message from front-end"); + } } } eprintln!("client recv loop shutdown"); - client_send_clone.connected.set(false); + client_send.connected.set(false); }); } diff --git a/console_backend/src/settings_tab.rs b/console_backend/src/settings_tab.rs index c70b1c924..cc2bbb973 100644 --- a/console_backend/src/settings_tab.rs +++ b/console_backend/src/settings_tab.rs @@ -2,20 +2,21 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::fs; use std::path::Path; +use std::time::Duration; use anyhow::anyhow; use capnp::message::Builder; use ini::Ini; use lazy_static::lazy_static; use log::{debug, error, warn}; -use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; +use parking_lot::Mutex; use sbp::link::Link; use sbp::messages::piksi::MsgReset; use sbp::messages::settings::MsgSettingsSave; use sbp_settings::{Client, SettingKind, SettingValue}; use crate::errors::SHARED_STATE_LOCK_MUTEX_FAILURE; -use crate::shared_state::SharedState; +use crate::shared_state::{SettingsTabState, SharedState}; use crate::types::{CapnProtoSender, Error, MsgSender, Result}; use crate::utils::*; @@ -31,13 +32,67 @@ lazy_static! { ]; } +pub fn start_thd(tab: &SettingsTab) { + let mut recv = tab.shared_state.watch_settings_state(); + while let Ok(state) = recv.wait() { + tab.shared_state.set_settings_state(Default::default()); + tick(tab, state); + } +} + +fn tick(settings_tab: &SettingsTab, settings_state: SettingsTabState) { + if settings_state.refresh { + settings_tab.refresh(); + } + if let Some(ref path) = settings_state.export { + if let Err(e) = settings_tab.export(path) { + error!("Issue exporting settings, {}", e); + }; + } + if let Some(ref path) = settings_state.import { + if let Err(e) = settings_tab.import(path) { + error!("Issue importing settings, {}", e); + }; + } + if let Some(req) = settings_state.write { + if let Err(e) = settings_tab.write_setting(&req.group, &req.name, &req.value) { + error!("Issue writing setting, {}", e); + }; + } + if settings_state.reset { + if let Err(e) = settings_tab.reset(true) { + error!("Issue resetting settings {}", e); + }; + } + if settings_state.save { + if let Err(e) = settings_tab.save() { + error!("Issue saving settings, {}", e); + }; + } + if settings_state.confirm_ins_change { + if let Err(e) = settings_tab.confirm_ins_change() { + error!("Issue confirming INS change, {}", e); + }; + } + if settings_state.auto_survey_request { + if let Err(e) = settings_tab.auto_survey() { + error!("Issue running auto survey, {}", e); + }; + } +} + pub struct SettingsTab<'link, S> { - client_sender: S, shared_state: SharedState, - settings: Settings, - client: Mutex>>, - link: Link<'link, ()>, + client_sender: Mutex, msg_sender: MsgSender, + settings: Mutex, + sbp_client: Client<'link>, +} + +impl Drop for SettingsTab<'_, S> { + fn drop(&mut self) { + self.shared_state.reset_settings_state(); + } } impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { @@ -48,66 +103,28 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { link: Link<'link, ()>, ) -> SettingsTab<'link, S> { SettingsTab { - settings: Settings::new(), - client: Mutex::new(None), - client_sender, shared_state, - link, - msg_sender, + client_sender: Mutex::new(client_sender), + msg_sender: msg_sender.clone(), + settings: Mutex::new(Settings::new()), + sbp_client: Client::new(link, move |msg| msg_sender.send(msg).map_err(Into::into)), } } - pub fn tick(&mut self) { - let settings_state = self.shared_state.take_settings_state(); - if settings_state.refresh { - self.refresh(); - } - if let Some(ref path) = settings_state.export { - if let Err(e) = self.export(path) { - error!("Issue exporting settings, {}", e); - }; - } - if let Some(ref path) = settings_state.import { - if let Err(e) = self.import(path) { - error!("Issue importing settings, {}", e); - }; - } - if let Some(req) = settings_state.write { - if let Err(e) = self.write_setting(&req.group, &req.name, &req.value) { - error!("Issue writing setting, {}", e); - }; - } - if settings_state.reset { - if let Err(e) = self.reset(true) { - error!("Issue resetting settings {}", e); - }; - } - if settings_state.save { - if let Err(e) = self.save() { - error!("Issue saving settings, {}", e); - }; - } - if settings_state.confirm_ins_change { - if let Err(e) = self.confirm_ins_change() { - error!("Issue confirming INS change, {}", e); - }; - } - - if settings_state.auto_survey_request { - if let Err(e) = self.auto_survey() { - error!("Issue running auto survey, {}", e); - }; - } + pub fn stop(&self) { + self.shared_state.reset_settings_state(); } - pub fn refresh(&mut self) { + fn refresh(&self) { + self.settings.lock().clear_values(); self.read_all_settings(); self.send_table_data(); } - pub fn export(&self, path: &Path) -> Result<()> { + fn export(&self, path: &Path) -> Result<()> { let mut f = fs::File::create(path)?; - let groups = self.settings.groups(); + let settings = self.settings.lock(); + let groups = settings.groups(); let mut conf = Ini::new(); for group in groups.iter() { let mut section = conf.with_section(Some(&group[0].0.group)); @@ -120,7 +137,7 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(()) } - pub fn import(&mut self, path: &Path) -> Result<()> { + fn import(&self, path: &Path) -> Result<()> { let mut f = fs::File::open(path)?; let conf = Ini::read_from(&mut f)?; for (group, prop) in conf.iter() { @@ -142,25 +159,27 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(()) } - pub fn import_success(&mut self) { + fn import_success(&self) { let mut builder = Builder::new_default(); let msg = builder.init_root::(); let mut import_response = msg.init_settings_import_response(); import_response.set_status("success"); self.client_sender + .lock() .send_data(serialize_capnproto_builder(builder)); } - pub fn import_err(&mut self, err: &Error) { + fn import_err(&self, err: &Error) { let mut builder = Builder::new_default(); let msg = builder.init_root::(); let mut import_response = msg.init_settings_import_response(); import_response.set_status(&err.to_string()); self.client_sender + .lock() .send_data(serialize_capnproto_builder(builder)); } - pub fn reset(&self, reset_settings: bool) -> Result<()> { + fn reset(&self, reset_settings: bool) -> Result<()> { let flags = if reset_settings { 1 } else { 0 }; self.msg_sender.send( @@ -173,13 +192,13 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(()) } - pub fn save(&self) -> Result<()> { + fn save(&self) -> Result<()> { self.msg_sender .send(MsgSettingsSave { sender_id: None }.into())?; Ok(()) } - pub fn auto_survey(&mut self) -> Result<()> { + fn auto_survey(&self) -> Result<()> { let (lat, lon, alt) = { let shared_data = self .shared_state @@ -205,9 +224,9 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(()) } - pub fn confirm_ins_change(&mut self) -> Result<()> { - let ins_mode = self - .settings + fn confirm_ins_change(&self) -> Result<()> { + let settings = self.settings.lock(); + let ins_mode = settings .get("ins", "output_mode")? .value .as_ref() @@ -228,15 +247,12 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(()) } - pub fn get_recommended_ins_setting_changes( - &self, - ) -> Result> { - let client = self.client(); - + fn get_recommended_ins_setting_changes(&self) -> Result> { let mut recommended_changes = vec![]; for setting in RECOMMENDED_INS_SETTINGS.iter() { - let value = client + let value = self + .sbp_client .read_setting(setting.0, setting.1) .ok_or_else(|| anyhow!("setting not found"))??; if value != setting.2 { @@ -252,7 +268,7 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(recommended_changes) } - pub fn send_ins_change_response(&mut self, output_mode: &str) -> Result<()> { + fn send_ins_change_response(&self, output_mode: &str) -> Result<()> { let mut builder = Builder::new_default(); let msg = builder.init_root::(); let mut ins_resp = msg.init_ins_settings_change_response(); @@ -273,13 +289,15 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { } self.client_sender + .lock() .send_data(serialize_capnproto_builder(builder)); Ok(()) } - pub fn write_setting(&mut self, group: &str, name: &str, value: &str) -> Result<()> { - let setting = self.settings.get(group, name)?; + fn write_setting(&self, group: &str, name: &str, value: &str) -> Result<()> { + let settings = self.settings.lock(); + let setting = settings.get(group, name)?; if let Some(ref v) = setting.value { if v.to_string() == value { @@ -288,19 +306,20 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { } } - self.client().write_setting(group, name, value)?; + self.sbp_client.write_setting(group, name, value)?; if matches!( setting.setting.kind, SettingKind::Float | SettingKind::Double ) { let new_setting = self - .client() + .sbp_client .read_setting(group, name) .ok_or_else(|| anyhow!("settting not found"))??; - self.settings.set(group, name, new_setting)?; + self.settings.lock().set(group, name, new_setting)?; } else { self.settings + .lock() .set(group, name, SettingValue::String(value.to_string()))?; } @@ -313,18 +332,16 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { Ok(()) } - pub fn read_all_settings(&mut self) { - let results = self.client().read_all(); - for result in results { - let setting = match result { - Ok(setting) => setting, - Err(e) => { - error!("{}", e); - continue; - } - }; + fn read_all_settings(&self) { + const TIMEOUT: Duration = Duration::from_secs(15); - let current_setting = match self.settings.get_mut(&setting.group, &setting.name) { + let (settings, errors) = self.sbp_client.read_all_timeout(TIMEOUT); + for e in errors { + warn!("{}", e); + } + for setting in settings { + let mut settings = self.settings.lock(); + let current_setting = match settings.get_mut(&setting.group, &setting.name) { Ok(setting) => setting, Err(_) => { warn!( @@ -372,8 +389,9 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { } /// Package settings table data into a message buffer and send to frontend. - pub fn send_table_data(&mut self) { - let groups = self.settings.groups(); + fn send_table_data(&self) { + let settings = self.settings.lock(); + let groups = settings.groups(); if groups.is_empty() { return; } @@ -460,20 +478,9 @@ impl<'link, S: CapnProtoSender> SettingsTab<'link, S> { } } self.client_sender + .lock() .send_data(serialize_capnproto_builder(builder)); } - - fn client<'a>(&'a self) -> MappedMutexGuard<'a, Client<'link>> { - let mut client = self.client.lock(); - if client.is_some() { - return MutexGuard::map(client, |c| c.as_mut().unwrap()); - } - let sender = self.msg_sender.clone(); - *client = Some(Client::new(self.link.clone(), move |msg| { - sender.send(msg).map_err(Into::into) - })); - MutexGuard::map(client, |c| c.as_mut().unwrap()) - } } #[derive(Debug, Clone)] @@ -502,6 +509,14 @@ impl Settings { } } + fn clear_values(&mut self) { + for group in self.inner.values_mut() { + for setting in group.values_mut() { + setting.value = None; + } + } + } + fn groups(&self) -> Vec> { self.inner.values().fold(Vec::new(), |mut groups, group| { let group: Vec<_> = group diff --git a/console_backend/src/shared_state.rs b/console_backend/src/shared_state.rs index a96a2ac0e..c7f2231d3 100644 --- a/console_backend/src/shared_state.rs +++ b/console_backend/src/shared_state.rs @@ -1,4 +1,5 @@ -use crate::common_constants::{self as cc, SbpLogging}; +use crate::common_constants::SbpLogging; +use crate::connection::ConnectionState; use crate::constants::{ APPLICATION_NAME, APPLICATION_ORGANIZATION, APPLICATION_QUALIFIER, CONNECTION_HISTORY_FILENAME, DEFAULT_LOG_DIRECTORY, MAX_CONNECTION_HISTORY, MPS, @@ -6,12 +7,12 @@ use crate::constants::{ use crate::errors::{CONVERT_TO_STR_FAILURE, SHARED_STATE_LOCK_MUTEX_FAILURE}; use crate::log_panel::LogLevel; use crate::output::{CsvLogging, CsvSerializer}; -use crate::piksi_tools_constants::*; use crate::settings_tab; use crate::solution_tab::LatLonUnits; use crate::types::CapnProtoSender; use crate::update_tab::UpdateTabUpdate; -use crate::utils::set_connected_frontend; +use crate::utils::send_conn_state; +use crate::watch::{WatchReceiver, Watched}; use anyhow::{Context, Result as AHResult}; use chrono::{DateTime, Utc}; use crossbeam::channel::Sender; @@ -44,39 +45,26 @@ impl SharedState { pub fn new() -> SharedState { SharedState(Arc::new(Mutex::new(SharedStateInner::default()))) } - pub fn is_running(&self) -> bool { + pub fn connection(&self) -> ConnectionState { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).running + (*shared_data).conn.get() } - pub fn set_running(&self, set_to: bool, mut client_send: S) + pub fn watch_connection(&self) -> WatchReceiver { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).conn.watch() + } + pub fn set_connection(&self, conn: ConnectionState, client_send: &mut S) where S: CapnProtoSender, { - if set_to { - set_connected_frontend(cc::ApplicationStates::CONNECTED, &mut client_send); - } else { - set_connected_frontend(cc::ApplicationStates::DISCONNECTED, &mut client_send); - self.set_current_connection(EMPTY_STR.to_string()); + { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + if let ConnectionState::Connected { stop_token, .. } = shared_data.conn.get() { + stop_token.stop(); + } + shared_data.conn.send(conn.clone()); } - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).running = set_to; - } - pub fn is_server_running(&self) -> bool { - let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).server_running - } - pub fn stop_server_running(&self) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).running = false; - (*shared_data).server_running = false; - } - pub fn is_paused(&self) -> bool { - let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).paused - } - pub fn set_paused(&self, set_to: bool) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).paused = set_to; + send_conn_state(conn, client_send); } pub fn debug(&self) -> bool { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); @@ -238,47 +226,75 @@ impl SharedState { let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).update_tab_sender = Some(sender); } - pub fn settings_needs_update(&self) -> bool { - self.lock() - .expect(SHARED_STATE_LOCK_MUTEX_FAILURE) - .settings_tab - .needs_update() + pub fn watch_settings_state(&self) -> WatchReceiver { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + shared_data.settings_tab.watch() } - pub fn take_settings_state(&self) -> SettingsTabState { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.take() + pub fn set_settings_state(&self, state: SettingsTabState) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + shared_data.settings_tab.send(state); } - pub fn set_settings_refresh(&self, set_to: bool) { + pub fn reset_settings_state(&self) { let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.refresh = set_to; + shared_data.settings_tab = Watched::new(SettingsTabState::new()); } - pub fn set_settings_save(&self, set_to: bool) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.save = set_to; + pub fn set_settings_refresh(&self, refresh: bool) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data + .settings_tab + .send(SettingsTabState { refresh, ..data }); } - pub fn set_settings_reset(&self, set_to: bool) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.reset = set_to; + pub fn set_settings_save(&self, save: bool) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data + .settings_tab + .send(SettingsTabState { save, ..data }); } - pub fn set_settings_confirm_ins_change(&self, set_to: bool) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.confirm_ins_change = set_to; + pub fn set_settings_reset(&self, reset: bool) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data + .settings_tab + .send(SettingsTabState { reset, ..data }); } - pub fn set_settings_auto_survey_request(&self, set_to: bool) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.auto_survey_request = set_to; + pub fn set_settings_auto_survey_request(&self, auto_survey_request: bool) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data.settings_tab.send(SettingsTabState { + auto_survey_request, + ..data + }); } - pub fn set_export_settings(&self, path: Option) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.export = path; + pub fn set_settings_confirm_ins_change(&self, confirm_ins_change: bool) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data.settings_tab.send(SettingsTabState { + confirm_ins_change, + ..data + }); } - pub fn set_import_settings(&self, path: Option) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.import = path; + pub fn set_export_settings(&self, export: Option) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data + .settings_tab + .send(SettingsTabState { export, ..data }); } - pub fn set_write_setting(&self, setting: Option) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - shared_data.settings_tab.write = setting; + pub fn set_import_settings(&self, import: Option) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data + .settings_tab + .send(SettingsTabState { import, ..data }); + } + pub fn set_write_setting(&self, write: Option) { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + let data = shared_data.settings_tab.get(); + shared_data + .settings_tab + .send(SettingsTabState { write, ..data }); } pub fn console_version(&self) -> String { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); @@ -352,16 +368,15 @@ pub struct SharedStateInner { pub(crate) logging_bar: LoggingBarState, pub(crate) log_panel: LogPanelState, pub(crate) tracking_tab: TrackingTabState, - pub(crate) paused: bool, pub(crate) connection_history: ConnectionHistory, - pub(crate) running: bool, + pub(crate) conn: Watched, pub(crate) debug: bool, pub(crate) server_running: bool, pub(crate) solution_tab: SolutionTabState, pub(crate) baseline_tab: BaselineTabState, pub(crate) advanced_spectrum_analyzer_tab: AdvancedSpectrumAnalyzerTabState, pub(crate) update_tab_sender: Option>>, - pub(crate) settings_tab: SettingsTabState, + pub(crate) settings_tab: Watched, pub(crate) console_version: String, pub(crate) firmware_version: Option, pub(crate) dgnss_enabled: bool, @@ -378,16 +393,15 @@ impl SharedStateInner { logging_bar: LoggingBarState::new(log_directory), log_panel: LogPanelState::new(), tracking_tab: TrackingTabState::new(), - paused: false, debug: false, connection_history, - running: false, + conn: Watched::new(ConnectionState::Disconnected), server_running: true, solution_tab: SolutionTabState::new(), baseline_tab: BaselineTabState::new(), advanced_spectrum_analyzer_tab: AdvancedSpectrumAnalyzerTabState::new(), update_tab_sender: None, - settings_tab: SettingsTabState::new(), + settings_tab: Watched::new(SettingsTabState::new()), console_version: String::from(include_str!("version.txt").trim()), firmware_version: None, dgnss_enabled: false, @@ -593,7 +607,7 @@ impl AutoSurveyData { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct SettingsTabState { pub refresh: bool, pub reset: bool, @@ -609,21 +623,6 @@ impl SettingsTabState { fn new() -> Self { Default::default() } - - fn take(&mut self) -> SettingsTabState { - std::mem::take(self) - } - - fn needs_update(&self) -> bool { - self.refresh - || self.reset - || self.save - || self.confirm_ins_change - || self.auto_survey_request - || self.export.is_some() - || self.import.is_some() - || self.write.is_some() - } } // Navbar Types. diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 0f405ae95..e7a7e9ec3 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -257,7 +257,9 @@ impl ClientSender { } impl CapnProtoSender for ClientSender { fn send_data(&mut self, msg_bytes: Vec) { - self.inner.send(msg_bytes).unwrap(); + if self.connected.get() { + let _ = self.inner.send(msg_bytes); + } } } diff --git a/console_backend/src/utils.rs b/console_backend/src/utils.rs index 3cd4d595c..55dcdd3a7 100644 --- a/console_backend/src/utils.rs +++ b/console_backend/src/utils.rs @@ -11,11 +11,11 @@ use sbp::SbpString; use semver::{Version, VersionReq}; //BuildMetadata, Prerelease, use serialport::available_ports; +use crate::connection::ConnectionState; use crate::constants::*; use crate::errors::*; -use crate::shared_state::SerialConfig; +use crate::shared_state::{SerialConfig, SharedState}; use crate::types::{CapnProtoSender, SignalCodes}; -use crate::{common_constants as cc, shared_state::SharedState}; /// Compare to semvar strings and return true if the later_version is greater than the early version. /// @@ -59,21 +59,8 @@ pub fn fixed_sbp_string(data: &str) -> SbpString<[u8; L], T> SbpString::new(arr) } -/// Send a CLOSE, or kill, signal to the frontend. -pub fn close_frontend(client_send: &mut P) { - let mut builder = Builder::new_default(); - let msg = builder.init_root::(); - let mut status = msg.init_status(); - let app_state = cc::ApplicationStates::CLOSE; - status.set_text(&app_state.to_string()); - client_send.send_data(serialize_capnproto_builder(builder)); -} - -/// Send a CONNECTED or DISCONNECTED, signal to the frontend. -pub fn set_connected_frontend( - app_state: cc::ApplicationStates, - client_send: &mut P, -) { +/// Notify the frontend of an [ConnectionState] change. +pub fn send_conn_state(app_state: ConnectionState, client_send: &mut P) { let mut builder = Builder::new_default(); let msg = builder.init_root::(); let mut status = msg.init_status(); diff --git a/console_backend/src/watch.rs b/console_backend/src/watch.rs new file mode 100644 index 000000000..e1cd9def5 --- /dev/null +++ b/console_backend/src/watch.rs @@ -0,0 +1,316 @@ +use std::fmt; +use std::sync::atomic::AtomicUsize; +use std::sync::Weak; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use parking_lot::{Condvar, Mutex}; + +pub struct Watched { + shared: Arc>, +} + +impl Watched { + pub fn new(value: T) -> Watched { + Watched { + shared: Arc::new(Shared { + data: Mutex::new(Value { value, version: 1 }), + on_update: Condvar::new(), + closed: AtomicBool::new(false), + senders: AtomicUsize::new(1), + }), + } + } + + pub fn get(&self) -> T { + self.shared.data.lock().value.clone() + } + + pub fn send(&self, value: T) { + { + let mut data = self.shared.data.lock(); + data.value = value; + data.version = data.version.wrapping_add(1); + } + self.shared.on_update.notify_all(); + } + + pub fn watch(&self) -> WatchReceiver { + let version = { + let data = self.shared.data.lock(); + data.version + }; + WatchReceiver { + shared: Arc::downgrade(&self.shared), + last_seen: version.wrapping_sub(1), + } + } + + pub fn close(&self) { + self.shared.close(); + } +} + +impl Clone for Watched { + fn clone(&self) -> Self { + self.shared.senders.fetch_add(1, Ordering::SeqCst); + Self { + shared: Arc::clone(&self.shared), + } + } +} + +impl Default for Watched { + fn default() -> Self { + Watched::new(Default::default()) + } +} + +impl fmt::Debug for Watched { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Watched").finish() + } +} + +impl Drop for Watched { + fn drop(&mut self) { + // if we are the last sender close the channel so things calling + // `.wait()` will terminate + if self.shared.senders.fetch_sub(1, Ordering::SeqCst) == 1 { + self.shared.close(); + } + } +} + +pub struct WatchReceiver { + shared: Weak>, + last_seen: u64, +} + +impl WatchReceiver { + pub fn get(&mut self) -> Result { + let shared = Shared::upgrade(&self.shared)?; + let data = shared.data.lock(); + self.last_seen = data.version; + Ok(data.value.clone()) + } + + pub fn get_if_new(&mut self) -> Result, RecvError> { + let shared = Shared::upgrade(&self.shared)?; + let data = shared.data.lock(); + if self.last_seen == data.version { + Ok(None) + } else { + self.last_seen = data.version; + Ok(Some(data.value.clone())) + } + } + + pub fn wait(&mut self) -> Result { + let shared = Shared::upgrade(&self.shared)?; + let mut data = shared.data.lock(); + while data.version == self.last_seen { + shared.on_update.wait(&mut data); + if shared.is_closed() { + return Err(RecvError); + } + } + self.last_seen = data.version; + Ok(data.value.clone()) + } + + pub fn wait_until(&mut self, mut f: F) -> Result + where + F: FnMut(&T) -> bool, + { + loop { + let v = self.wait()?; + if f(&v) { + return Ok(v); + } + } + } + + pub fn wait_while(&mut self, mut f: F) -> Result + where + F: FnMut(&T) -> bool, + { + self.wait_until(|v| !f(v)) + } +} + +impl Clone for WatchReceiver { + fn clone(&self) -> WatchReceiver { + WatchReceiver { + shared: Weak::clone(&self.shared), + last_seen: self.last_seen, + } + } +} + +struct Shared { + data: Mutex>, + on_update: Condvar, + closed: AtomicBool, + senders: AtomicUsize, +} + +impl Shared { + fn close(&self) { + self.closed.store(true, Ordering::SeqCst); + self.on_update.notify_all(); + } + + fn is_closed(&self) -> bool { + self.closed.load(Ordering::SeqCst) + } + + fn upgrade(me: &Weak) -> Result, RecvError> { + let shared = me.upgrade().ok_or(RecvError)?; + if shared.is_closed() { + Err(RecvError) + } else { + Ok(shared) + } + } +} + +struct Value { + value: T, + version: u64, +} + +#[derive(Debug)] +pub struct RecvError; + +impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "watched value dropped") + } +} + +impl std::error::Error for RecvError {} + +#[cfg(test)] +mod tests { + use std::{thread, time::Duration}; + + use crossbeam::channel; + + use super::*; + + #[test] + fn starts_unseen() { + let watched = Watched::new(0); + let mut recv = watched.watch(); + assert_eq!(recv.get_if_new().unwrap(), Some(0)); + } + + #[test] + fn wait() { + let watched = Watched::new(0); + let mut recv = watched.watch(); + + assert_eq!(watched.get(), 0); + assert_eq!(recv.get().unwrap(), 0); + + watched.send(1); + assert_eq!(watched.get(), 1); + + let send_thread = thread::spawn(move || { + watched.send(2); + watched + }); + recv.wait().unwrap(); + + let watched = send_thread.join().unwrap(); + let recv_thread = thread::spawn(move || { + recv.wait().unwrap(); + recv + }); + watched.send(3); + + let mut recv = recv_thread.join().unwrap(); + assert_eq!(recv.get().unwrap(), 3); + assert_eq!(watched.get(), 3); + } + + #[test] + fn wait_while() { + let watched = Watched::new(0); + let mut recv = watched.watch(); + let (s, r) = channel::bounded(0); + thread::spawn(move || { + let v = recv.wait_while(|v| *v < 2).unwrap(); + assert_eq!(v, 2); + s.send(()).unwrap() + }); + + thread::sleep(Duration::from_secs(1)); + assert!(r.try_recv().is_err()); + + watched.send(1); + thread::sleep(Duration::from_secs(1)); + assert!(r.try_recv().is_err()); + + watched.send(2); + thread::sleep(Duration::from_secs(1)); + assert!(r.try_recv().is_ok()); + } + + #[test] + fn disconnect_watch() { + let watched = Watched::new(0); + let mut recv = watched.watch(); + // mark first as seen + let _ = recv.get(); + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + drop(watched); + }); + assert!(recv.wait().is_err()); + } + + #[test] + fn multiple_consumers() { + let watched = Watched::new(0); + let mut recv1 = watched.watch(); + let mut recv2 = watched.watch(); + + watched.send(1); + assert_eq!(recv1.get_if_new().unwrap(), Some(1)); + assert_eq!(recv2.get_if_new().unwrap(), Some(1)); + + watched.send(2); + assert_eq!(recv1.get_if_new().unwrap(), Some(2)); + + watched.send(3); + assert_eq!(recv1.get_if_new().unwrap(), Some(3)); + assert_eq!(recv2.get_if_new().unwrap(), Some(3)); + + drop(watched); + assert!(recv1.wait().is_err()); + assert!(recv2.wait().is_err()); + } + + #[test] + fn clone_recv() { + let watched = Watched::new(0); + let mut recv1 = watched.watch(); + + watched.send(1); + + let mut recv2 = recv1.clone(); + assert_eq!(recv2.get_if_new().unwrap(), Some(1)); + + let mut recv3 = recv2.clone(); + assert_eq!(recv3.get_if_new().unwrap(), None); + assert_eq!(recv1.get_if_new().unwrap(), Some(1)); + + watched.send(2); + assert_eq!(recv1.get_if_new().unwrap(), Some(2)); + assert_eq!(recv2.get_if_new().unwrap(), Some(2)); + } +} diff --git a/console_backend/tests/mem_benches.rs b/console_backend/tests/mem_benches.rs index 543f7e746..a191a9563 100644 --- a/console_backend/tests/mem_benches.rs +++ b/console_backend/tests/mem_benches.rs @@ -1,19 +1,13 @@ #[cfg(feature = "benches")] mod mem_bench_impl { + use std::{error::Error, result::Result, thread}; + use crossbeam::channel; use ndarray::{ArrayView, Axis, Dim}; - use std::{ - error::Error, - result::Result, - sync::{Arc, Mutex}, - thread, - }; - use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use console_backend::{ - connection::Connection, - process_messages, + connection::{ConnectionManager, ConnectionState}, shared_state::SharedState, types::{ClientSender, RealtimeDelay}, }; @@ -50,12 +44,14 @@ mod mem_bench_impl { /// - mean - std >= absolute_mean #[test] fn test_run_process_messages() { - let (client_recv_tx, client_recv_rx) = channel::unbounded::>>(); let pid = get_current_pid().unwrap(); println!("PID: {}", pid); - let is_running = Arc::new(Mutex::new(true)); - let is_running_recv = Arc::clone(&is_running); - let is_running_mem = Arc::clone(&is_running); + + let (client_send, client_recv) = channel::unbounded::>(); + let (mem_stop_send, mem_stop_recv) = channel::bounded(1); + let shared_state = SharedState::new(); + shared_state.set_debug(true); + let mem_read_thread = thread::spawn(move || { let mut sys = System::new(); let mut mem_readings_kb: Vec = vec![]; @@ -65,46 +61,42 @@ mod mem_bench_impl { let proc = sys.get_process(pid).unwrap(); mem_readings_kb.push(proc.memory() as f32); cpu_readings.push(proc.cpu_usage()); - let is_running_mem = is_running_mem.lock().unwrap(); - if !*is_running_mem { + if mem_stop_recv.try_recv().is_ok() { break; } } validate_memory_benchmark(&mem_readings_kb, &cpu_readings); }); + + let recv_state = shared_state.clone(); let recv_thread = thread::spawn(move || { - let client_recv = client_recv_rx.recv().unwrap(); let mut iter_count = 0; - loop { - if client_recv.recv().is_err() { + if client_recv.recv().is_err() || recv_state.connection().is_disconnected() { break; } - iter_count += 1; } assert!(iter_count > 0); - let mut is_running_recv = is_running_recv.lock().unwrap(); - *is_running_recv = false; + mem_stop_send.send(()).unwrap(); }); - { - let (client_send_, client_recv) = channel::unbounded::>(); - client_recv_tx - .send(client_recv) - .expect("sending client recv handle should succeed"); + let mut conn_watch = shared_state.watch_connection(); + let client_send = ClientSender::new(client_send); + let conn_manager = ConnectionManager::new(client_send, shared_state); - let client_send = ClientSender::new(client_send_); - let shared_state = SharedState::new(); - shared_state.set_running(true, client_send.clone()); - shared_state.set_debug(true); - let conn = Connection::file( - BENCH_FILEPATH.into(), - RealtimeDelay::On, - /*close_when_done=*/ true, - ); - process_messages::process_messages(conn, shared_state, client_send).unwrap(); - } + conn_manager.connect_to_file( + BENCH_FILEPATH.into(), + RealtimeDelay::On, + /*close_when_done=*/ true, + ); + conn_watch + .wait_while(ConnectionState::is_disconnected) + .unwrap(); + conn_watch + .wait_while(ConnectionState::is_connected) + .unwrap(); + drop(conn_manager); recv_thread.join().expect("join should succeed"); mem_read_thread.join().expect("join should succeed"); } @@ -115,8 +107,8 @@ mod mem_bench_impl { /// - `mem_readings`: The vector containing all memory readings acquired during benchmark. /// - `cpu_readings`: The vector containing all cpu percentage readings acquired during benchmark. fn validate_memory_benchmark(mem_readings: &[f32], cpu_readings: &[f32]) { - let mems = vec_1d_to_array(&mem_readings).unwrap(); - let cpus = vec_1d_to_array(&cpu_readings).unwrap(); + let mems = vec_1d_to_array(mem_readings).unwrap(); + let cpus = vec_1d_to_array(cpu_readings).unwrap(); assert!( mem_readings.len() >= MINIMUM_MEM_READINGS, "This benchmark requires {} samples to be collected for analysis and only {} samples were collected.", diff --git a/resources/NavBar.qml b/resources/NavBar.qml index d27f8b866..38ecf7f86 100644 --- a/resources/NavBar.qml +++ b/resources/NavBar.qml @@ -18,6 +18,7 @@ Rectangle { property variant previous_ports: [] property variant previous_files: [] property variant log_level_labels: [] + property string conn_state: "DISCONNECTED" property variant previous_serial_configs: [] property variant last_used_serial_device: null @@ -247,40 +248,14 @@ Rectangle { } - Button { - id: connectionPauseButton - - Layout.preferredWidth: Constants.navBar.connectionPauseWidth - Layout.preferredHeight: Constants.navBar.buttonHeight - ToolTip.visible: hovered - ToolTip.text: !checked ? "Pause" : "Unpause" - checkable: true - onClicked: data_model.pause(checked) - - Image { - id: connectionPauseImage - - anchors.centerIn: parent - width: Constants.navBar.buttonSvgHeight - height: Constants.navBar.buttonSvgHeight - source: Constants.icons.pauseButtonUrl - visible: false - } - - ColorOverlay { - anchors.fill: connectionPauseImage - source: connectionPauseImage - color: !connectionPauseButton.checked ? Constants.materialGrey : Constants.swiftOrange - } - - } - Button { id: connectButton Layout.preferredWidth: Constants.navBar.connectButtonWidth Layout.preferredHeight: Constants.navBar.buttonHeight checkable: true + checked: true + enabled: conn_state == "DISCONNECTED" || conn_state == "CONNECTED" ToolTip.visible: hovered ToolTip.text: !checked ? "Connect" : "Disconnect" onClicked: { @@ -390,7 +365,6 @@ Rectangle { previous_ports = navBarData.previous_ports; previous_files = navBarData.previous_files; previous_serial_configs = navBarData.previous_serial_configs; - connectButton.checked = navBarData.connected; logLevelButton.currentIndex = log_level_labels.indexOf(navBarData.log_level); if (!last_used_serial_device && navBarData.last_used_serial_device) { // Set the default selected to the last used @@ -400,6 +374,8 @@ Rectangle { restore_previous_serial_settings(available_devices[serialDevice.currentIndex]); } + conn_state = navBarData.conn_state; + connectButton.checked = conn_state == "CONNECTED"; } } diff --git a/src/main/resources/base/console_backend.capnp b/src/main/resources/base/console_backend.capnp index 680df42eb..83f0266a4 100644 --- a/src/main/resources/base/console_backend.capnp +++ b/src/main/resources/base/console_backend.capnp @@ -19,10 +19,6 @@ struct SerialRefreshRequest { refresh @0 :Void = void; } -struct PauseRequest { - pause @0 :Bool; -} - struct DisconnectRequest { disconnect @0 :Void = void; } @@ -445,45 +441,44 @@ struct Message { tcpRequest @9 :TcpRequest; fileRequest @10 :FileRequest; serialRequest @11 :SerialRequest; - pauseRequest @12 :PauseRequest; - disconnectRequest @13 :DisconnectRequest; - navBarStatus @14 :NavBarStatus; - serialRefreshRequest @15 :SerialRefreshRequest; - logAppend @16 :LogAppend; - observationStatus @17 :ObservationStatus; - statusBarStatus @18 :StatusBarStatus; - loggingBarFront @19 :LoggingBarFront; - loggingBarStatus @20 :LoggingBarStatus; - logLevelFront @21 :LogLevelFront; - advancedInsStatus @22 :AdvancedInsStatus; - fusionStatusFlagsStatus @23 :FusionStatusFlagsStatus; - advancedMagnetometerStatus @24 :AdvancedMagnetometerStatus; - baselinePlotStatus @25 :BaselinePlotStatus; - baselineTableStatus @26 :BaselineTableStatus; - baselinePlotStatusButtonFront @27 :BaselinePlotStatusButtonFront; - advancedSpectrumAnalyzerStatus @28:AdvancedSpectrumAnalyzerStatus; - advancedSpectrumAnalyzerStatusFront @29:AdvancedSpectrumAnalyzerStatusFront; - updateTabStatus @30:UpdateTabStatus; - updateTabStatusFront @31:UpdateTabStatusFront; - settingsTableStatus @32 :SettingsTableStatus; - settingsRefreshRequest @33 :SettingsRefreshRequest; - settingsExportRequest @34 :SettingsExportRequest; - settingsImportRequest @35 :SettingsImportRequest; - settingsImportResponse @36 :SettingsImportResponse; - settingsWriteRequest @37 :SettingsWriteRequest; - settingsResetRequest @38 :SettingsResetRequest; - settingsSaveRequest @39 :SettingsSaveRequest; - advancedSystemMonitorStatus @40 :AdvancedSystemMonitorStatus; - threadState @41 :ThreadState; - uartState @42 :UartState; - advancedSystemMonitorStatusFront @43 :AdvancedSystemMonitorStatusFront; - skyPlotObs @44 :SkyPlotObs; - trackingSkyPlotStatus @45 :TrackingSkyPlotStatus; - advancedNetworkingStatus @46 :AdvancedNetworkingStatus; - networkState @47 :NetworkState; - advancedNetworkingStatusFront @48 :AdvancedNetworkingStatusFront; - insSettingsChangeResponse @49 : InsSettingsChangeResponse; - confirmInsChange @50 : ConfirmInsChange; - autoSurveyRequest @51 : AutoSurveyRequest; + disconnectRequest @12 :DisconnectRequest; + navBarStatus @13 :NavBarStatus; + serialRefreshRequest @14 :SerialRefreshRequest; + logAppend @15 :LogAppend; + observationStatus @16 :ObservationStatus; + statusBarStatus @17 :StatusBarStatus; + loggingBarFront @18 :LoggingBarFront; + loggingBarStatus @19 :LoggingBarStatus; + logLevelFront @20 :LogLevelFront; + advancedInsStatus @21 :AdvancedInsStatus; + fusionStatusFlagsStatus @22 :FusionStatusFlagsStatus; + advancedMagnetometerStatus @23 :AdvancedMagnetometerStatus; + baselinePlotStatus @24 :BaselinePlotStatus; + baselineTableStatus @25 :BaselineTableStatus; + baselinePlotStatusButtonFront @26 :BaselinePlotStatusButtonFront; + advancedSpectrumAnalyzerStatus @27:AdvancedSpectrumAnalyzerStatus; + advancedSpectrumAnalyzerStatusFront @28:AdvancedSpectrumAnalyzerStatusFront; + updateTabStatus @29:UpdateTabStatus; + updateTabStatusFront @30:UpdateTabStatusFront; + settingsTableStatus @31 :SettingsTableStatus; + settingsRefreshRequest @32 :SettingsRefreshRequest; + settingsExportRequest @33 :SettingsExportRequest; + settingsImportRequest @34 :SettingsImportRequest; + settingsImportResponse @35 :SettingsImportResponse; + settingsWriteRequest @36 :SettingsWriteRequest; + settingsResetRequest @37 :SettingsResetRequest; + settingsSaveRequest @38 :SettingsSaveRequest; + advancedSystemMonitorStatus @39 :AdvancedSystemMonitorStatus; + threadState @40 :ThreadState; + uartState @41 :UartState; + advancedSystemMonitorStatusFront @42 :AdvancedSystemMonitorStatusFront; + skyPlotObs @43 :SkyPlotObs; + trackingSkyPlotStatus @44 :TrackingSkyPlotStatus; + advancedNetworkingStatus @45 :AdvancedNetworkingStatus; + networkState @46 :NetworkState; + advancedNetworkingStatusFront @47 :AdvancedNetworkingStatusFront; + insSettingsChangeResponse @48 : InsSettingsChangeResponse; + confirmInsChange @49 : ConfirmInsChange; + autoSurveyRequest @50 : AutoSurveyRequest; } } diff --git a/swiftnav_console/constants.py b/swiftnav_console/constants.py index 720472a15..58b304d83 100644 --- a/swiftnav_console/constants.py +++ b/swiftnav_console/constants.py @@ -71,7 +71,7 @@ class Keys(str, Enum): PREVIOUS_HOSTS = "PREVIOUS_HOSTS" PREVIOUS_PORTS = "PREVIOUS_PORTS" PREVIOUS_FILES = "PREVIOUS_FILES" - CONNECTED = "CONNECTED" + CONNECTION_STATE = "CONNECTION_STATE" PORT = "PORT" POS = "POS" RTK = "RTK" @@ -134,8 +134,8 @@ class Keys(str, Enum): PREVIOUS_SERIAL_CONFIGS = "PREVIOUS_SERIAL_CONFIGS" -class ApplicationStates(str, Enum): - CLOSE = "CLOSE" +class ConnectionState(str, Enum): + CLOSED = "CLOSED" CONNECTED = "CONNECTED" DISCONNECTED = "DISCONNECTED" diff --git a/swiftnav_console/main.py b/swiftnav_console/main.py index 94cc5a10e..bec51936a 100644 --- a/swiftnav_console/main.py +++ b/swiftnav_console/main.py @@ -25,7 +25,7 @@ import console_backend.server # type: ignore # pylint: disable=import-error,no-name-in-module -from .constants import ApplicationMetadata, ApplicationStates, Keys, Tabs, QTKeys +from .constants import ApplicationMetadata, ConnectionState, Keys, Tabs, QTKeys from .log_panel import ( LOG_PANEL, @@ -230,12 +230,10 @@ def receive_messages(app_, backend, messages): Message = messages.Message m = Message.from_bytes(buffer) if m.which == Message.Union.Status: - if m.status.text == ApplicationStates.CLOSE: + app_state = ConnectionState(m.status.text) + if app_state == ConnectionState.CLOSED: return app_.quit() - if m.status.text == ApplicationStates.CONNECTED: - NAV_BAR[Keys.CONNECTED] = True - elif m.status.text == ApplicationStates.DISCONNECTED: - NAV_BAR[Keys.CONNECTED] = False + NAV_BAR[Keys.CONNECTION_STATE] = app_state elif m.which == Message.Union.SolutionPositionStatus: SOLUTION_POSITION_TAB[Keys.POINTS][:] = [ diff --git a/swiftnav_console/nav_bar.py b/swiftnav_console/nav_bar.py index 846cef9f1..cdb177caf 100644 --- a/swiftnav_console/nav_bar.py +++ b/swiftnav_console/nav_bar.py @@ -5,14 +5,14 @@ from PySide2.QtCore import Property, QObject, Slot -from .constants import Keys, LogLevel, QTKeys +from .constants import Keys, LogLevel, QTKeys, ConnectionState NAV_BAR: Dict[str, Any] = { Keys.AVAILABLE_PORTS: [], Keys.AVAILABLE_BAUDRATES: [], Keys.AVAILABLE_FLOWS: [], Keys.AVAILABLE_REFRESH_RATES: [], - Keys.CONNECTED: False, + Keys.CONNECTION_STATE: ConnectionState.DISCONNECTED, Keys.PREVIOUS_HOSTS: [], Keys.PREVIOUS_PORTS: [], Keys.PREVIOUS_FILES: [], @@ -29,7 +29,7 @@ class NavBarData(QObject): # pylint: disable=too-many-instance-attributes disab _available_baudrates: List[str] = [] _available_flows: List[str] = [] _available_refresh_rates: List[str] = [] - _connected: bool = False + _conn_state: ConnectionState = ConnectionState.DISCONNECTED _previous_hosts: List[str] = [] _previous_ports: List[str] = [] _previous_files: List[str] = [] @@ -90,19 +90,19 @@ def set_available_refresh_rates(self, available_refresh_rates: List[str]) -> Non QTKeys.QVARIANTLIST, get_available_refresh_rates, set_available_refresh_rates # type: ignore ) - def get_connected(self) -> bool: - """Getter for _connected. + def get_conn_state(self) -> ConnectionState: + """Getter for _conn_state. Returns: - bool: Whether a connection is live or not. + ConnectionState: Whether a connection is live, disconnecting or disconnected. """ - return self._connected + return self._conn_state - def set_connected(self, connected: bool) -> None: - """Setter for _connected.""" - self._connected = connected + def set_conn_state(self, conn_state: ConnectionState) -> None: + """Setter for _conn_state.""" + self._conn_state = conn_state - connected = Property(bool, get_connected, set_connected) + conn_state = Property(str, get_conn_state, set_conn_state) def get_previous_hosts(self) -> List[str]: return self._previous_hosts @@ -154,7 +154,7 @@ def fill_data(self, cp: NavBarData) -> NavBarData: # pylint:disable=no-self-use cp.set_available_baudrates(NAV_BAR[Keys.AVAILABLE_BAUDRATES]) cp.set_available_flows(NAV_BAR[Keys.AVAILABLE_FLOWS]) cp.set_available_refresh_rates(NAV_BAR[Keys.AVAILABLE_REFRESH_RATES]) - cp.set_connected(NAV_BAR[Keys.CONNECTED]) + cp.set_conn_state(NAV_BAR[Keys.CONNECTION_STATE]) cp.set_previous_hosts(NAV_BAR[Keys.PREVIOUS_HOSTS]) cp.set_previous_ports(NAV_BAR[Keys.PREVIOUS_PORTS]) cp.set_previous_files(NAV_BAR[Keys.PREVIOUS_FILES])