From e5541f19798302e4946696eba4ea415669178254 Mon Sep 17 00:00:00 2001 From: John Michael Burke Date: Tue, 29 Jun 2021 14:22:56 -0700 Subject: [PATCH 1/6] More reliable conn logic/serverstate->nonarcmutex[CPP-251] --- console_backend/benches/cpu_benches.rs | 8 +- console_backend/src/bin/fileio.rs | 8 +- console_backend/src/cli_options.rs | 8 +- console_backend/src/constants.rs | 2 + console_backend/src/errors.rs | 4 + console_backend/src/process_messages.rs | 20 +- console_backend/src/server.rs | 87 +-- console_backend/src/types.rs | 688 ++++++++++++++---------- console_backend/tests/mem_benches.rs | 8 +- 9 files changed, 475 insertions(+), 358 deletions(-) diff --git a/console_backend/benches/cpu_benches.rs b/console_backend/benches/cpu_benches.rs index 5780a1d6a..f896b9ba6 100644 --- a/console_backend/benches/cpu_benches.rs +++ b/console_backend/benches/cpu_benches.rs @@ -61,8 +61,12 @@ fn run_process_messages(file_in_name: &str, failure: bool) { inner: client_send_, }; shared_state.set_running(true, client_send.clone()); - let conn = Connection::file(file_in_name.into()).unwrap(); - process_messages::process_messages(conn, shared_state, client_send, RealtimeDelay::Off); + let conn = Connection::file( + file_in_name.into(), + RealtimeDelay::Off, + /*close_when_done=*/ true, + ); + process_messages::process_messages(conn, shared_state, client_send).unwrap(); } recv_thread.join().expect("join should succeed"); } diff --git a/console_backend/src/bin/fileio.rs b/console_backend/src/bin/fileio.rs index 496898179..33266ae65 100644 --- a/console_backend/src/bin/fileio.rs +++ b/console_backend/src/bin/fileio.rs @@ -70,7 +70,7 @@ fn main() -> Result<()> { dest, input, } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); @@ -88,7 +88,7 @@ fn main() -> Result<()> { dest, input, } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); @@ -104,7 +104,7 @@ fn main() -> Result<()> { .unwrap() } Opts::List { path, input } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); @@ -117,7 +117,7 @@ fn main() -> Result<()> { .unwrap() } Opts::Delete { path, input } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); diff --git a/console_backend/src/cli_options.rs b/console_backend/src/cli_options.rs index e6e371e31..91b820996 100644 --- a/console_backend/src/cli_options.rs +++ b/console_backend/src/cli_options.rs @@ -177,7 +177,7 @@ pub enum Input { } impl Input { - pub fn into_conn(self) -> crate::types::Result { + pub fn into_conn(self) -> Connection { match self { Input::Tcp { host, port } => Connection::tcp(host, port), Input::Serial { @@ -185,7 +185,11 @@ impl Input { baudrate, flow_control, } => Connection::serial(serialport.to_string_lossy().into(), baudrate, flow_control), - Input::File { file_in } => Connection::file(file_in.to_string_lossy().into()), + Input::File { file_in } => Connection::file( + file_in.to_string_lossy().into(), + crate::types::RealtimeDelay::On, + /*close_when_done=*/ false, + ), } } } diff --git a/console_backend/src/constants.rs b/console_backend/src/constants.rs index c5d562ee9..3a3c91672 100644 --- a/console_backend/src/constants.rs +++ b/console_backend/src/constants.rs @@ -239,3 +239,5 @@ pub(crate) const UNKNOWN_ERROR: &str = "Unk Error"; pub(crate) const UNKNOWN_ERROR_SHORT: &str = "unk"; pub(crate) const ODO_POSTFIX: &str = "+Odo"; pub(crate) const INS_POSTFIX: &str = "+INS"; + +pub(crate) const SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC: f64 = 1.0; diff --git a/console_backend/src/errors.rs b/console_backend/src/errors.rs index 44535169e..8ec8484d4 100644 --- a/console_backend/src/errors.rs +++ b/console_backend/src/errors.rs @@ -10,3 +10,7 @@ pub(crate) const GET_MUT_OBJECT_FAILURE: &str = "error trying to get mut object" pub(crate) const UNABLE_TO_STOP_TIMER_THREAD_FAILURE: &str = "unable to kill running timer thread"; pub(crate) const UNABLE_TO_SEND_INS_UPDATE_FAILURE: &str = "unable to send an ins status update"; pub(crate) const THREAD_JOIN_FAILURE: &str = "thread join failure"; +pub(crate) const TCP_CONNECTION_PARSING_FAILURE: &str = + "unable to parse the provided string for ip string"; +pub(crate) const SERVER_STATE_NEW_CONNECTION_FAILURE: &str = "server state new connection failure"; +pub(crate) const SERVER_STATE_DISCONNECT_FAILURE: &str = "server state disconnect failure"; diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index bc8276b0c..1e7ff9a4f 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -10,21 +10,25 @@ use crate::constants::PAUSE_LOOP_SLEEP_DURATION_MS; use crate::log_panel::handle_log_msg; use crate::main_tab::*; use crate::types::*; +use crate::utils::{close_frontend, refresh_navbar}; pub fn process_messages( conn: Connection, shared_state: SharedState, - client_send: S, - realtime_delay: RealtimeDelay, -) where + mut client_send: S, +) -> Result<()> +where S: MessageSender, { - let (rdr, _) = conn.into_io(); - let mut main = MainTab::new(shared_state.clone(), client_send); + let realtime_delay = conn.realtime_delay(); + let (rdr, _) = conn.try_connect(Some(shared_state.clone()))?; + shared_state.set_running(true, client_send.clone()); + shared_state.set_current_connection(conn.name()); + refresh_navbar(&mut client_send.clone(), shared_state.clone()); + let mut main = MainTab::new(shared_state.clone(), client_send.clone()); let messages = sbp::iter_messages(rdr) .log_errors(log::Level::Debug) .with_rover_time(); - for (message, gps_time) in messages { if !shared_state.is_running() { if let Err(e) = main.end_csv_logging() { @@ -183,4 +187,8 @@ pub fn process_messages( } log::logger().flush(); } + if conn.close_when_done() { + close_frontend(&mut client_send); + } + Ok(()) } diff --git a/console_backend/src/server.rs b/console_backend/src/server.rs index b4bec3525..04c715251 100644 --- a/console_backend/src/server.rs +++ b/console_backend/src/server.rs @@ -5,8 +5,7 @@ use pyo3::prelude::*; use pyo3::types::PyBytes; use async_logger_log::Logger; -use log::warn; - +use log::error; use std::{ io::{BufReader, Cursor}, path::PathBuf, @@ -21,7 +20,7 @@ use crate::constants::LOG_WRITER_BUFFER_MESSAGE_COUNT; use crate::errors::*; use crate::log_panel::{splitable_log_formatter, LogLevel, LogPanelWriter}; use crate::output::{CsvLogging, SbpLogging}; -use crate::types::{ClientSender, FlowControl, ServerState, SharedState}; +use crate::types::{ClientSender, FlowControl, RealtimeDelay, ServerState, SharedState}; use crate::utils::{refresh_loggingbar, refresh_navbar}; /// The backend server @@ -64,31 +63,15 @@ impl ServerEndpoint { /// - `server_state`: The Server state to start a specific connection. /// - `client_send`: Client Sender channel for communication from backend to frontend. /// - `shared_state`: The shared state for validating another connection is not already running. -fn handle_cli( - opt: CliOptions, - server_state: ServerState, - client_send: ClientSender, - shared_state: SharedState, -) { +fn handle_cli(opt: CliOptions, server_state: &ServerState, shared_state: SharedState) { if let Some(opt_input) = opt.input { match opt_input { Input::Tcp { host, port } => { - if let Err(e) = - server_state.connect_to_host(client_send, shared_state.clone(), host, port) - { - warn!("failed to connect over tcp: {}", e); - } + server_state.connect_to_host(host, port); } Input::File { file_in } => { let filename = file_in.display().to_string(); - if let Err(e) = server_state.connect_to_file( - client_send, - shared_state.clone(), - filename, - opt.exit_after, - ) { - warn!("failed to connect to file: {}", e); - } + server_state.connect_to_file(filename, RealtimeDelay::On, opt.exit_after); } Input::Serial { serialport, @@ -96,15 +79,7 @@ fn handle_cli( flow_control, } => { let serialport = serialport.display().to_string(); - if let Err(e) = server_state.connect_to_serial( - client_send, - shared_state.clone(), - serialport, - baudrate, - flow_control, - ) { - warn!("failed to connect over serial: {}", e); - } + server_state.connect_to_serial(serialport, baudrate, flow_control); } } } @@ -165,7 +140,7 @@ impl Server { inner: client_send_, }; let shared_state = SharedState::new(); - let server_state = ServerState::new(); + let server_state = ServerState::new(client_send.clone(), shared_state.clone()); let logger = Logger::builder() .buf_size(LOG_WRITER_BUFFER_MESSAGE_COUNT) @@ -177,12 +152,7 @@ impl Server { log::set_boxed_logger(Box::new(logger)).expect("Failed to set logger"); // Handle CLI Opts. - handle_cli( - opt, - server_state.clone(), - client_send.clone(), - shared_state.clone(), - ); + handle_cli(opt, &server_state, shared_state.clone()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); refresh_loggingbar(&mut client_send.clone(), shared_state.clone()); thread::spawn(move || loop { @@ -200,7 +170,7 @@ impl Server { let message = match message.which() { Ok(msg) => msg, Err(e) => { - eprintln!("error reading message: {}", e); + error!("error reading message: {}", e); continue; } }; @@ -215,11 +185,10 @@ impl Server { let request = match request.which() { Ok(msg) => msg, Err(e) => { - eprintln!("error reading message: {}", e); + error!("error reading message: {}", e); continue; } }; - let server_state_clone = server_state.clone(); let shared_state_clone = shared_state.clone(); let client_send_clone = client_send.clone(); match request { @@ -227,23 +196,18 @@ impl Server { refresh_navbar(&mut client_send_clone.clone(), shared_state_clone); } m::message::DisconnectRequest(Ok(_)) => { - shared_state_clone.set_running(false, client_send_clone.clone()); - server_state_clone.connection_join(); - println!("Disconnected successfully."); + server_state.disconnect(client_send_clone.clone()); } m::message::FileRequest(Ok(req)) => { let filename = req .get_filename() .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let filename = filename.to_string(); - if let Err(e) = server_state_clone.connect_to_file( - client_send_clone, - shared_state_clone, + server_state.connect_to_file( filename, - /*close_when_done = */ false, - ) { - warn!("Failed to connect to file: {}", e); - } + RealtimeDelay::On, + /*close_when_done*/ false, + ); } m::message::PauseRequest(Ok(_)) => { if shared_state_clone.is_paused() { @@ -256,14 +220,7 @@ impl Server { let host = req.get_host().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let port = req.get_port(); - if let Err(e) = server_state_clone.connect_to_host( - client_send_clone, - shared_state_clone, - host.to_string(), - port, - ) { - warn!("Failed to connect over tcp: {}", e); - } + server_state.connect_to_host(host.to_string(), port); } m::message::SerialRequest(Ok(req)) => { let device = @@ -272,15 +229,7 @@ impl Server { let baudrate = req.get_baudrate(); let flow = req.get_flow_control().unwrap(); let flow = FlowControl::from_str(flow).unwrap(); - if let Err(e) = server_state_clone.connect_to_serial( - client_send_clone, - shared_state_clone, - device, - baudrate, - flow, - ) { - warn!("Failed to connect over serial: {}", e); - } + server_state.connect_to_serial(device, baudrate, flow); } _ => println!("err"), } @@ -375,7 +324,7 @@ impl Server { (*shared_data).baseline_tab.reset = cv_in.get_reset_filters(); } _ => { - eprintln!("unknown message from front-end"); + error!("unknown message from front-end"); } } } else { diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 22ec7bd4d..014b7b7e5 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -5,9 +5,10 @@ use crate::log_panel::LogLevel; use crate::output::CsvLogging; use crate::piksi_tools_constants::*; use crate::process_messages::process_messages; -use crate::utils::{close_frontend, mm_to_m, ms_to_sec, refresh_navbar, set_connected_frontend}; +use crate::utils::{mm_to_m, ms_to_sec, set_connected_frontend}; use anyhow::{Context, Result as AHResult}; use chrono::{DateTime, Utc}; +use crossbeam::channel::{unbounded, Receiver, Sender}; use directories::{ProjectDirs, UserDirs}; use indexmap::set::IndexSet; use lazy_static::lazy_static; @@ -37,14 +38,16 @@ use std::{ fs, hash::Hash, io, - net::TcpStream, + net::{SocketAddr, TcpStream, ToSocketAddrs}, ops::Deref, - path::PathBuf, + path::{Path, PathBuf}, str::FromStr, sync::{ + self, atomic::{AtomicBool, Ordering::*}, - mpsc::Sender, - Arc, Mutex, + // mpsc::Sender, + Arc, + Mutex, }, thread, thread::JoinHandle, @@ -121,7 +124,7 @@ pub trait MessageSender: Debug + Clone + Send { #[derive(Debug, Clone)] pub struct ClientSender { - pub inner: Sender>, + pub inner: sync::mpsc::Sender>, } impl MessageSender for ClientSender { fn send_data(&mut self, msg_bytes: Vec) { @@ -169,143 +172,115 @@ impl Clone for ArcBool { } #[derive(Debug)] -pub struct ServerState(Arc>); - -impl Deref for ServerState { - type Target = Mutex; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Default for ServerState { - fn default() -> Self { - Self::new() - } +pub struct ServerState { + pub handler: Option>, + shared_state: SharedState, + sender: Sender>, + receiver: Receiver>, } +impl ServerState { + pub fn new(client_send: ClientSender, shared_state: SharedState) -> ServerState { + let (sender, receiver) = unbounded(); -impl Clone for ServerState { - fn clone(&self) -> Self { ServerState { - 0: Arc::clone(&self.0), + handler: Some(ServerState::connect_thread( + client_send, + shared_state.clone(), + receiver.clone(), + )), + shared_state, + sender, + receiver, } } -} -impl ServerState { - pub fn new() -> ServerState { - ServerState(Arc::new(Mutex::new(ServerStateInner::default()))) - } - pub fn new_connection(&self, new_thread: JoinHandle<()>) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).handler = Some(new_thread); - } - pub fn connection_join(&self) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - let handler = &mut (*shared_data).handler; - if let Some(handle) = handler.take() { - handle.join().unwrap(); - } + fn connect_thread( + client_send: ClientSender, + shared_state: SharedState, + receiver: Receiver>, + ) -> JoinHandle<()> { + thread::spawn(move || { + let mut conn = None; + while shared_state.clone().is_server_running() { + if let Ok(conn_option) = receiver.recv_timeout(Duration::from_secs_f64( + SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC, + )) { + match conn_option { + Some(conn_) => { + conn = Some(conn_); + } + None => { + conn = None; + info!("Disconnected successfully."); + } + } + } + if let Some(conn_) = conn.clone() { + if let Err(e) = + process_messages(conn_, shared_state.clone(), client_send.clone()) + { + error!("unable to process messages, {}", e); + } + shared_state.set_running(false, client_send.clone()); + } + } + }) } /// Helper function for attempting to open a file and process SBP messages from it. /// /// # Parameters - /// - `client_send`: Client Sender channel for communication from backend to frontend. - /// - `shared_state`: The shared state for validating another connection is not already running. /// - `filename`: The path to the filename to be read for SBP messages. pub fn connect_to_file( &self, - client_send: ClientSender, - shared_state: SharedState, filename: String, + realtime_delay: RealtimeDelay, close_when_done: bool, - ) -> Result<()> { - let conn = Connection::file(filename.clone())?; - info!("Opened file successfully!"); - shared_state.update_file_history(filename); - self.connect( - conn, - client_send, - shared_state, - RealtimeDelay::On, - close_when_done, - ); - Ok(()) + ) { + let conn = Connection::file(filename, realtime_delay, close_when_done); + self.connect(conn); } /// Helper function for attempting to open a tcp connection and process SBP messages from it. /// /// # Parameters - /// - `client_send`: Client Sender channel for communication from backend to frontend. - /// - `shared_state`: The shared state for validating another connection is not already running. /// - `host`: The host portion of the TCP stream to open. /// - `port`: The port to be used to open a TCP stream. - pub fn connect_to_host( - &self, - client_send: ClientSender, - shared_state: SharedState, - host: String, - port: u16, - ) -> Result<()> { - let conn = Connection::tcp(host.clone(), port)?; - info!("Connected to tcp stream!"); - shared_state.update_tcp_history(host, port); - self.connect(conn, client_send, shared_state, RealtimeDelay::Off, false); - Ok(()) + pub fn connect_to_host(&self, host: String, port: u16) { + let conn = Connection::tcp(host, port); + self.connect(conn); } /// Helper function for attempting to open a serial port and process SBP messages from it. /// /// # Parameters - /// - `client_send`: Client Sender channel for communication from backend to frontend. - /// - `shared_state`: The shared state for validating another connection is not already running. /// - `device`: The string path corresponding to the serial device to connect with. /// - `baudrate`: The baudrate to use when communicating with the serial device. /// - `flow`: The flow control mode to use when communicating with the serial device. - pub fn connect_to_serial( - &self, - client_send: ClientSender, - shared_state: SharedState, - device: String, - baudrate: u32, - flow: FlowControl, - ) -> Result<()> { - let conn = Connection::serial(device, baudrate, flow)?; - info!("Connected to serialport!"); - self.connect(conn, client_send, shared_state, RealtimeDelay::Off, false); - Ok(()) + pub fn connect_to_serial(&self, device: String, baudrate: u32, flow: FlowControl) { + let conn = Connection::serial(device, baudrate, flow); + self.connect(conn); } - fn connect( - &self, - conn: Connection, - mut client_send: ClientSender, - shared_state: SharedState, - delay: RealtimeDelay, - close_when_done: bool, - ) { - shared_state.set_current_connection(conn.name.clone()); - shared_state.set_running(true, client_send.clone()); - self.connection_join(); - self.new_connection(thread::spawn(move || { - refresh_navbar(&mut client_send, shared_state.clone()); - process_messages(conn, shared_state.clone(), client_send.clone(), delay); - if close_when_done { - close_frontend(&mut client_send); - } - shared_state.set_running(false, client_send); - })); + /// Send disconnect signal to server state loop. + pub fn disconnect(&self, client_send: S) { + self.shared_state.set_running(false, client_send); + if let Err(err) = self.sender.try_send(None) { + error!("{}, {}", SERVER_STATE_DISCONNECT_FAILURE, err); + } } -} -#[derive(Debug, Default)] -pub struct ServerStateInner { - pub handler: Option>, + /// Helper function to send connection object to server state loop. + fn connect(&self, conn: Connection) { + if let Err(err) = self.sender.try_send(Some(conn)) { + error!("{}, {}", SERVER_STATE_NEW_CONNECTION_FAILURE, err); + } + } } -impl ServerStateInner { - pub fn new() -> ServerStateInner { - ServerStateInner { handler: None } + +impl Drop for ServerState { + fn drop(&mut self) { + self.shared_state.stop_server_running(); } } @@ -320,7 +295,10 @@ impl SharedState { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).running } - pub fn set_running(&self, set_to: bool, mut client_send: ClientSender) { + pub fn set_running(&self, set_to: bool, mut client_send: S) + where + S: MessageSender, + { if set_to { set_connected_frontend(cc::ApplicationStates::CONNECTED, &mut client_send); } else { @@ -330,6 +308,15 @@ impl SharedState { let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).running = set_to; } + pub fn is_server_running(&self) -> bool { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).server_running + } + pub fn stop_server_running(&self) { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).running = false; + (*shared_data).server_running = false; + } pub fn is_paused(&self) -> bool { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).paused @@ -450,6 +437,7 @@ pub struct SharedStateInner { pub(crate) paused: bool, pub(crate) connection_history: ConnectionHistory, pub(crate) running: bool, + pub(crate) server_running: bool, pub(crate) solution_tab: SolutionTabState, pub(crate) baseline_tab: BaselineTabState, } @@ -465,6 +453,7 @@ impl SharedStateInner { paused: false, connection_history, running: false, + server_running: true, solution_tab: SolutionTabState::new(), baseline_tab: BaselineTabState::new(), } @@ -623,6 +612,7 @@ impl SolutionVelocityTabState { } // Main Tab Types. +#[derive(Clone, Copy)] pub enum RealtimeDelay { On, Off, @@ -815,7 +805,7 @@ impl ConnectionHistory { } // Enum wrapping around various Vel NED Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FlowControl(SPFlowControl); impl FromStr for FlowControl { @@ -1841,49 +1831,201 @@ impl std::str::FromStr for VelocityUnits { } } -pub struct Connection { - pub name: String, - pub rdr: Box, - pub wtr: Box, +#[derive(Clone)] +pub struct TcpConnection { + name: String, + host: String, + port: u16, } - -impl Connection { - pub fn tcp(host: String, port: u16) -> Result { +impl TcpConnection { + fn new(host: String, port: u16) -> Self { let name = format!("{}:{}", host, port); - let rdr = TcpStream::connect(&name)?; + Self { name, host, port } + } + fn socket_addrs(name: String) -> Result { + let socket = &mut name.to_socket_addrs()?; + let socket = if let Some(socket_) = socket.next() { + socket_ + } else { + let e: Box = String::from(TCP_CONNECTION_PARSING_FAILURE).into(); + return Err(e); + }; + Ok(socket) + } +} +impl ConnectionType for TcpConnection { + fn name(&self) -> String { + self.name.clone() + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)> { + let socket = TcpConnection::socket_addrs(self.name.clone())?; + let rdr = + TcpStream::connect_timeout(&socket, Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS))?; let wtr = rdr.try_clone()?; - Ok(Self { - name, - rdr: Box::new(rdr), - wtr: Box::new(wtr), - }) + info!("Connected to tcp stream!"); + if let Some(shared_state_) = shared_state { + shared_state_.update_tcp_history(self.host, self.port); + } + Ok((Box::new(rdr), Box::new(wtr))) } +} - pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Result { - let rdr = serialport::new(&device, baudrate) - .flow_control(*flow) +#[derive(Clone)] +pub struct SerialConnection { + pub name: String, + pub device: String, + pub baudrate: u32, + pub flow: FlowControl, +} +impl SerialConnection { + fn new(device: String, baudrate: u32, flow: FlowControl) -> Self { + Self { + name: format!("{} @{}", device, baudrate), + device, + baudrate, + flow, + } + } +} +impl ConnectionType for SerialConnection { + fn name(&self) -> String { + self.name.clone() + } + fn try_connect( + self, + _shared_state: Option, + ) -> Result<(Box, Box)> { + let rdr = serialport::new(self.device, self.baudrate) + .flow_control(*self.flow) .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) .open()?; let wtr = rdr.try_clone()?; - Ok(Self { - name: device, - rdr: Box::new(rdr), - wtr: Box::new(wtr), - }) + info!("Opened serial port successfully!"); + Ok((Box::new(rdr), Box::new(wtr))) } +} - pub fn file(filename: String) -> Result { - let rdr = fs::File::open(&filename)?; +#[derive(Clone)] +pub struct FileConnection { + pub name: String, + pub filepath: PathBuf, + close_when_done: bool, + realtime_delay: RealtimeDelay, +} +impl FileConnection { + fn new>( + filepath: P, + close_when_done: bool, + realtime_delay: RealtimeDelay, + ) -> Self { + let filepath = PathBuf::from(filepath.as_ref()); + let name = if let Some(path) = filepath.file_name() { + path + } else { + filepath.as_os_str() + }; + let name: &str = &*name.to_string_lossy(); + Self { + name: String::from(name), + filepath, + close_when_done, + realtime_delay, + } + } +} +impl ConnectionType for FileConnection { + fn name(&self) -> String { + self.name.clone() + } + fn close_when_done(&self) -> bool { + self.close_when_done + } + fn realtime_delay(&self) -> RealtimeDelay { + self.realtime_delay + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)> { + let rdr = fs::File::open(&self.filepath)?; let wtr = io::sink(); - Ok(Self { - name: filename, - rdr: Box::new(rdr), - wtr: Box::new(wtr), - }) + info!("Opened file successfully!"); + if let Some(shared_state_) = shared_state { + shared_state_.update_file_history(self.filepath.to_string_lossy().to_string()); + } + Ok((Box::new(rdr), Box::new(wtr))) } +} - pub fn into_io(self) -> (Box, Box) { - (self.rdr, self.wtr) +trait ConnectionType { + fn close_when_done(&self) -> bool { + false + } + fn name(&self) -> String; + fn realtime_delay(&self) -> RealtimeDelay { + RealtimeDelay::Off + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)>; +} + +#[derive(Clone)] +pub enum Connection { + Tcp(TcpConnection), + File(FileConnection), + Serial(SerialConnection), +} +impl Connection { + pub fn tcp(host: String, port: u16) -> Self { + Connection::Tcp(TcpConnection::new(host, port)) + } + + pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Self { + Connection::Serial(SerialConnection::new(device, baudrate, flow)) + } + + pub fn file(filename: String, realtime_delay: RealtimeDelay, close_when_done: bool) -> Self { + Connection::File(FileConnection::new( + filename, + close_when_done, + realtime_delay, + )) + } + pub fn close_when_done(&self) -> bool { + match self { + Connection::Tcp(conn) => conn.close_when_done(), + Connection::File(conn) => conn.close_when_done(), + Connection::Serial(conn) => conn.close_when_done(), + } + } + pub fn name(&self) -> String { + match self { + Connection::Tcp(conn) => conn.name(), + Connection::File(conn) => conn.name(), + Connection::Serial(conn) => conn.name(), + } + } + pub fn realtime_delay(&self) -> RealtimeDelay { + match self { + Connection::Tcp(conn) => conn.realtime_delay(), + Connection::File(conn) => conn.realtime_delay(), + Connection::Serial(conn) => conn.realtime_delay(), + } + } + pub fn try_connect( + &self, + shared_state: Option, + ) -> Result<(Box, Box)> { + match self { + Connection::Tcp(conn) => conn.clone().try_connect(shared_state), + Connection::File(conn) => conn.clone().try_connect(shared_state), + Connection::Serial(conn) => conn.clone().try_connect(shared_state), + } } } @@ -1891,15 +2033,15 @@ impl Connection { mod tests { use super::*; use serial_test::serial; - use std::{ - sync::mpsc, - thread::sleep, - time::{Duration, SystemTime}, - }; - const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; - const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; - const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; - const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; + // use std::{ + // sync::mpsc, + // thread::sleep, + // time::{Duration, SystemTime}, + // }; + // const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; + // const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; + // const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; + // const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; pub mod data_directories { #![allow(dead_code)] @@ -2037,136 +2179,136 @@ mod tests { restore_backup_file(bfilename); } - fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { - thread::spawn(move || { - let mut iter_count = 0; - - loop { - if client_recv.recv().is_err() { - break; - } + // fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { + // thread::spawn(move || { + // let mut iter_count = 0; - iter_count += 1; - } - assert!(iter_count > 0); - }) - } + // loop { + // if client_recv.recv().is_err() { + // break; + // } - #[test] - #[serial] - fn connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - server_state - .connect_to_file( - client_send, - shared_state.clone(), - filename, - /*close_when_done = */ true, - ) - .unwrap(); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - } + // iter_count += 1; + // } + // assert!(iter_count > 0); + // }) + // } - #[test] - #[serial] - fn pause_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - server_state - .connect_to_file( - client_send, - shared_state.clone(), - filename, - /*close_when_done = */ true, - ) - .unwrap(); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - shared_state.set_paused(true); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(shared_state.is_running()); - shared_state.set_paused(false); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - } + // #[test] + // #[serial] + // fn connect_to_file_test() { + // let bfilename = filename(); + // backup_file(bfilename.clone()); + // let shared_state = SharedState::new(); + // let (client_send_, client_receive) = mpsc::channel::>(); + // let client_send = ClientSender { + // inner: client_send_, + // }; + // let server_state = ServerState::new(); + // let filename = TEST_SHORT_FILEPATH.to_string(); + // receive_thread(client_receive); + // assert!(!shared_state.is_running()); + // server_state + // .connect_to_file( + // client_send, + // shared_state.clone(), + // filename, + // /*close_when_done = */ true, + // ) + // .unwrap(); + // sleep(Duration::from_millis( + // DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + // )); + // assert!(shared_state.is_running()); + // sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + // assert!(!shared_state.is_running()); + // restore_backup_file(bfilename); + // } - #[test] - #[serial] - fn disconnect_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(); - let filename = TEST_FILEPATH.to_string(); - let expected_duration = Duration::from_millis(100); - let handle = receive_thread(client_receive); - assert!(!shared_state.is_running()); - { - server_state - .connect_to_file( - client_send.clone(), - shared_state.clone(), - filename, - /*close_when_done = */ true, - ) - .unwrap(); - } + // #[test] + // #[serial] + // fn pause_via_connect_to_file_test() { + // let bfilename = filename(); + // backup_file(bfilename.clone()); + // let shared_state = SharedState::new(); + // let (client_send_, client_receive) = mpsc::channel::>(); + // let client_send = ClientSender { + // inner: client_send_, + // }; + // let server_state = ServerState::new(); + // let filename = TEST_SHORT_FILEPATH.to_string(); + // receive_thread(client_receive); + // assert!(!shared_state.is_running()); + // server_state + // .connect_to_file( + // client_send, + // shared_state.clone(), + // filename, + // /*close_when_done = */ true, + // ) + // .unwrap(); + // sleep(Duration::from_millis( + // DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + // )); + // assert!(shared_state.is_running()); + // shared_state.set_paused(true); + // sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + // assert!(shared_state.is_running()); + // shared_state.set_paused(false); + // sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + // assert!(!shared_state.is_running()); + // restore_backup_file(bfilename); + // } - sleep(Duration::from_millis(5)); - assert!(shared_state.is_running()); - let now = SystemTime::now(); - sleep(Duration::from_millis(1)); - shared_state.set_running(false, client_send); - sleep(Duration::from_millis(10)); - assert!(handle.join().is_ok()); - - match now.elapsed() { - Ok(elapsed) => { - assert!( - elapsed < expected_duration, - "Time elapsed for disconnect test {:?}, expecting {:?}ms", - elapsed, - expected_duration - ); - } - Err(e) => { - panic!("unknown error {}", e); - } - } - restore_backup_file(bfilename); - } + // #[test] + // #[serial] + // fn disconnect_via_connect_to_file_test() { + // let bfilename = filename(); + // backup_file(bfilename.clone()); + // let shared_state = SharedState::new(); + // let (client_send_, client_receive) = mpsc::channel::>(); + // let client_send = ClientSender { + // inner: client_send_, + // }; + // let server_state = ServerState::new(); + // let filename = TEST_FILEPATH.to_string(); + // let expected_duration = Duration::from_millis(100); + // let handle = receive_thread(client_receive); + // assert!(!shared_state.is_running()); + // { + // server_state + // .connect_to_file( + // client_send.clone(), + // shared_state.clone(), + // filename, + // /*close_when_done = */ true, + // ) + // .unwrap(); + // } + + // sleep(Duration::from_millis(5)); + // assert!(shared_state.is_running()); + // let now = SystemTime::now(); + // sleep(Duration::from_millis(1)); + // shared_state.set_running(false, client_send); + // sleep(Duration::from_millis(10)); + // assert!(handle.join().is_ok()); + + // match now.elapsed() { + // Ok(elapsed) => { + // assert!( + // elapsed < expected_duration, + // "Time elapsed for disconnect test {:?}, expecting {:?}ms", + // elapsed, + // expected_duration + // ); + // } + // Err(e) => { + // panic!("unknown error {}", e); + // } + // } + // restore_backup_file(bfilename); + // } // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for TCPStream. // #[test] diff --git a/console_backend/tests/mem_benches.rs b/console_backend/tests/mem_benches.rs index 19c7ab638..ad4b68222 100644 --- a/console_backend/tests/mem_benches.rs +++ b/console_backend/tests/mem_benches.rs @@ -98,8 +98,12 @@ mod mem_bench_impl { }; let shared_state = SharedState::new(); shared_state.set_running(true, client_send.clone()); - let conn = Connection::file(BENCH_FILEPATH.into()).unwrap(); - process_messages::process_messages(conn, shared_state, client_send, RealtimeDelay::On); + let conn = Connection::file( + BENCH_FILEPATH.into(), + RealtimeDelay::On, + /*close_when_done=*/ true, + ); + process_messages::process_messages(conn, shared_state, client_send).unwrap(); } recv_thread.join().expect("join should succeed"); mem_read_thread.join().expect("join should succeed"); From ad762f6c6321cbb50e1d89ff01116644d97f4310 Mon Sep 17 00:00:00 2001 From: John Michael Burke Date: Fri, 9 Jul 2021 17:21:14 -0700 Subject: [PATCH 2/6] Added logic to handle timeouts/internet drops. --- console_backend/src/process_messages.rs | 22 ++++++++++++++++++---- console_backend/src/types.rs | 4 ++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 1e7ff9a4f..484b217c7 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -1,10 +1,10 @@ use log::{debug, error}; -use sbp::sbp_tools::SBPTools; +use sbp::sbp_tools::{ControlFlow, SBPTools}; use sbp::{ messages::{SBPMessage, SBP}, serialize::SbpSerialize, }; -use std::{thread::sleep, time::Duration}; +use std::{io::ErrorKind, thread::sleep, time::Duration}; use crate::constants::PAUSE_LOOP_SLEEP_DURATION_MS; use crate::log_panel::handle_log_msg; @@ -20,14 +20,28 @@ pub fn process_messages( where S: MessageSender, { + shared_state.set_running(true, client_send.clone()); let realtime_delay = conn.realtime_delay(); let (rdr, _) = conn.try_connect(Some(shared_state.clone()))?; - shared_state.set_running(true, client_send.clone()); shared_state.set_current_connection(conn.name()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); let mut main = MainTab::new(shared_state.clone(), client_send.clone()); let messages = sbp::iter_messages(rdr) - .log_errors(log::Level::Debug) + .handle_errors(|e| { + debug!("{}", e); + match e { + sbp::Error::IoError(err) => { + match (*err).kind() { + ErrorKind::TimedOut => { + shared_state.set_running(false, client_send.clone()); + } + _ => {} + } + ControlFlow::Break + } + _ => ControlFlow::Continue, + } + }) .with_rover_time(); for (message, gps_time) in messages { if !shared_state.is_running() { diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 014b7b7e5..2dc4f06c2 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -221,6 +221,9 @@ impl ServerState { { error!("unable to process messages, {}", e); } + if !shared_state.is_running() { + conn = None; + } shared_state.set_running(false, client_send.clone()); } } @@ -1864,6 +1867,7 @@ impl ConnectionType for TcpConnection { let socket = TcpConnection::socket_addrs(self.name.clone())?; let rdr = TcpStream::connect_timeout(&socket, Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS))?; + rdr.set_read_timeout(Some(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)))?; let wtr = rdr.try_clone()?; info!("Connected to tcp stream!"); if let Some(shared_state_) = shared_state { From 8d849f45ed0366ac284efd69eac4d4f7290fd5c6 Mon Sep 17 00:00:00 2001 From: John Michael Burke Date: Fri, 9 Jul 2021 18:26:06 -0700 Subject: [PATCH 3/6] Fixed unittests with new conn logic in types.rs. --- console_backend/src/process_messages.rs | 8 +- console_backend/src/types.rs | 266 ++++++++++++------------ 2 files changed, 134 insertions(+), 140 deletions(-) diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 484b217c7..97b1d5a47 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -31,11 +31,8 @@ where debug!("{}", e); match e { sbp::Error::IoError(err) => { - match (*err).kind() { - ErrorKind::TimedOut => { - shared_state.set_running(false, client_send.clone()); - } - _ => {} + if (*err).kind() == ErrorKind::TimedOut { + shared_state.set_running(false, client_send.clone()); } ControlFlow::Break } @@ -202,6 +199,7 @@ where log::logger().flush(); } if conn.close_when_done() { + shared_state.set_running(false, client_send.clone()); close_frontend(&mut client_send); } Ok(()) diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 2dc4f06c2..a00fa297a 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -121,7 +121,7 @@ impl Deque { pub trait MessageSender: Debug + Clone + Send { fn send_data(&mut self, msg_bytes: Vec); } - +use std::ops::Drop; #[derive(Debug, Clone)] pub struct ClientSender { pub inner: sync::mpsc::Sender>, @@ -2037,15 +2037,15 @@ impl Connection { mod tests { use super::*; use serial_test::serial; - // use std::{ - // sync::mpsc, - // thread::sleep, - // time::{Duration, SystemTime}, - // }; - // const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; - // const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; - // const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; - // const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; + use std::{ + sync::mpsc, + thread::sleep, + time::{Duration, SystemTime}, + }; + const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; + const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; + const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; + const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; pub mod data_directories { #![allow(dead_code)] @@ -2183,136 +2183,132 @@ mod tests { restore_backup_file(bfilename); } - // fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { - // thread::spawn(move || { - // let mut iter_count = 0; + fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { + thread::spawn(move || { + let mut iter_count = 0; - // loop { - // if client_recv.recv().is_err() { - // break; - // } + loop { + if client_recv.recv().is_err() { + break; + } - // iter_count += 1; - // } - // assert!(iter_count > 0); - // }) - // } + iter_count += 1; + } + assert!(iter_count > 0); + }) + } - // #[test] - // #[serial] - // fn connect_to_file_test() { - // let bfilename = filename(); - // backup_file(bfilename.clone()); - // let shared_state = SharedState::new(); - // let (client_send_, client_receive) = mpsc::channel::>(); - // let client_send = ClientSender { - // inner: client_send_, - // }; - // let server_state = ServerState::new(); - // let filename = TEST_SHORT_FILEPATH.to_string(); - // receive_thread(client_receive); - // assert!(!shared_state.is_running()); - // server_state - // .connect_to_file( - // client_send, - // shared_state.clone(), - // filename, - // /*close_when_done = */ true, - // ) - // .unwrap(); - // sleep(Duration::from_millis( - // DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - // )); - // assert!(shared_state.is_running()); - // sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - // assert!(!shared_state.is_running()); - // restore_backup_file(bfilename); - // } + #[test] + #[serial] + fn connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let server_state = ServerState::new(client_send, shared_state.clone()); + let filename = TEST_SHORT_FILEPATH.to_string(); + receive_thread(client_receive); + assert!(!shared_state.is_running()); + server_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + sleep(Duration::from_millis( + DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + )); + assert!(shared_state.is_running()); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(!shared_state.is_running()); + restore_backup_file(bfilename); + } - // #[test] - // #[serial] - // fn pause_via_connect_to_file_test() { - // let bfilename = filename(); - // backup_file(bfilename.clone()); - // let shared_state = SharedState::new(); - // let (client_send_, client_receive) = mpsc::channel::>(); - // let client_send = ClientSender { - // inner: client_send_, - // }; - // let server_state = ServerState::new(); - // let filename = TEST_SHORT_FILEPATH.to_string(); - // receive_thread(client_receive); - // assert!(!shared_state.is_running()); - // server_state - // .connect_to_file( - // client_send, - // shared_state.clone(), - // filename, - // /*close_when_done = */ true, - // ) - // .unwrap(); - // sleep(Duration::from_millis( - // DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - // )); - // assert!(shared_state.is_running()); - // shared_state.set_paused(true); - // sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - // assert!(shared_state.is_running()); - // shared_state.set_paused(false); - // sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - // assert!(!shared_state.is_running()); - // restore_backup_file(bfilename); - // } + #[test] + #[serial] + fn pause_via_connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let server_state = ServerState::new(client_send, shared_state.clone()); + let filename = TEST_SHORT_FILEPATH.to_string(); + receive_thread(client_receive); + assert!(!shared_state.is_running()); + server_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + sleep(Duration::from_millis( + DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + )); + assert!(shared_state.is_running()); + shared_state.set_paused(true); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(shared_state.is_running()); + shared_state.set_paused(false); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(!shared_state.is_running()); + restore_backup_file(bfilename); + println!("done"); + } - // #[test] - // #[serial] - // fn disconnect_via_connect_to_file_test() { - // let bfilename = filename(); - // backup_file(bfilename.clone()); - // let shared_state = SharedState::new(); - // let (client_send_, client_receive) = mpsc::channel::>(); - // let client_send = ClientSender { - // inner: client_send_, - // }; - // let server_state = ServerState::new(); - // let filename = TEST_FILEPATH.to_string(); - // let expected_duration = Duration::from_millis(100); - // let handle = receive_thread(client_receive); - // assert!(!shared_state.is_running()); - // { - // server_state - // .connect_to_file( - // client_send.clone(), - // shared_state.clone(), - // filename, - // /*close_when_done = */ true, - // ) - // .unwrap(); - // } - - // sleep(Duration::from_millis(5)); - // assert!(shared_state.is_running()); - // let now = SystemTime::now(); - // sleep(Duration::from_millis(1)); - // shared_state.set_running(false, client_send); - // sleep(Duration::from_millis(10)); - // assert!(handle.join().is_ok()); - - // match now.elapsed() { - // Ok(elapsed) => { - // assert!( - // elapsed < expected_duration, - // "Time elapsed for disconnect test {:?}, expecting {:?}ms", - // elapsed, - // expected_duration - // ); - // } - // Err(e) => { - // panic!("unknown error {}", e); - // } - // } - // restore_backup_file(bfilename); - // } + #[test] + #[serial] + fn disconnect_via_connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let server_state = ServerState::new(client_send.clone(), shared_state.clone()); + let filename = TEST_FILEPATH.to_string(); + let expected_duration = Duration::from_secs_f64(SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC) + + Duration::from_millis(100); + let handle = receive_thread(client_receive); + assert!(!shared_state.is_running()); + { + server_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + } + + sleep(Duration::from_millis(5)); + assert!(shared_state.is_running()); + let now = SystemTime::now(); + sleep(Duration::from_millis(1)); + server_state.disconnect(client_send.clone()); + sleep(Duration::from_millis(10)); + assert!(!shared_state.is_running()); + shared_state.stop_server_running(); + drop(client_send); + assert!(handle.join().is_ok()); + + match now.elapsed() { + Ok(elapsed) => { + assert!( + elapsed < expected_duration, + "Time elapsed for disconnect test {:?}, expecting {:?}ms", + elapsed, + expected_duration + ); + } + Err(e) => { + panic!("unknown error {}", e); + } + } + restore_backup_file(bfilename); + } // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for TCPStream. // #[test] From a1850b948dd8f0bedb637256d543a3c4c5041307 Mon Sep 17 00:00:00 2001 From: John Michael Burke Date: Mon, 12 Jul 2021 10:04:43 -0700 Subject: [PATCH 4/6] ServerState->ConnectionState, put all conn in connection.rs. --- console_backend/benches/cpu_benches.rs | 3 +- console_backend/src/cli_options.rs | 2 +- console_backend/src/connection.rs | 519 ++++++++++++++++++++++++ console_backend/src/lib.rs | 1 + console_backend/src/process_messages.rs | 1 + console_backend/src/server.rs | 25 +- console_backend/src/types.rs | 480 +--------------------- console_backend/tests/mem_benches.rs | 3 +- 8 files changed, 545 insertions(+), 489 deletions(-) create mode 100644 console_backend/src/connection.rs diff --git a/console_backend/benches/cpu_benches.rs b/console_backend/benches/cpu_benches.rs index f896b9ba6..1c526a034 100644 --- a/console_backend/benches/cpu_benches.rs +++ b/console_backend/benches/cpu_benches.rs @@ -11,8 +11,9 @@ use std::{ extern crate console_backend; use console_backend::{ + connection::Connection, process_messages, - types::{ClientSender, Connection, RealtimeDelay, SharedState}, + types::{ClientSender, RealtimeDelay, SharedState}, }; const BENCH_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; diff --git a/console_backend/src/cli_options.rs b/console_backend/src/cli_options.rs index 91b820996..f7428d87b 100644 --- a/console_backend/src/cli_options.rs +++ b/console_backend/src/cli_options.rs @@ -11,7 +11,7 @@ use crate::log_panel::LogLevel; use crate::types::FlowControl; use crate::{ common_constants::{SbpLogging, Tabs}, - types::Connection, + connection::Connection, }; #[derive(Debug)] diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs new file mode 100644 index 000000000..34e0380e8 --- /dev/null +++ b/console_backend/src/connection.rs @@ -0,0 +1,519 @@ +use crate::constants::*; +use crate::errors::*; +use crate::process_messages::process_messages; +use crate::types::*; +use chrono::{DateTime, Utc}; +use crossbeam::channel::{unbounded, Receiver, Sender}; +use log::{error, info}; +use std::{ + fmt::Debug, + fs, io, + net::{SocketAddr, TcpStream, ToSocketAddrs}, + ops::Drop, + path::{Path, PathBuf}, + thread, + thread::JoinHandle, + time::Duration, +}; + +pub type Error = std::boxed::Box; +pub type Result = std::result::Result; +pub type UtcDateTime = DateTime; + +#[derive(Clone)] +pub struct TcpConnection { + name: String, + host: String, + port: u16, +} +impl TcpConnection { + fn new(host: String, port: u16) -> Self { + let name = format!("{}:{}", host, port); + Self { name, host, port } + } + fn socket_addrs(name: String) -> Result { + let socket = &mut name.to_socket_addrs()?; + let socket = if let Some(socket_) = socket.next() { + socket_ + } else { + let e: Box = String::from(TCP_CONNECTION_PARSING_FAILURE).into(); + return Err(e); + }; + Ok(socket) + } + fn name(&self) -> String { + self.name.clone() + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)> { + let socket = TcpConnection::socket_addrs(self.name.clone())?; + let rdr = + TcpStream::connect_timeout(&socket, Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS))?; + rdr.set_read_timeout(Some(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)))?; + let wtr = rdr.try_clone()?; + info!("Connected to tcp stream!"); + if let Some(shared_state_) = shared_state { + shared_state_.update_tcp_history(self.host, self.port); + } + Ok((Box::new(rdr), Box::new(wtr))) + } +} + +#[derive(Clone)] +pub struct SerialConnection { + pub name: String, + pub device: String, + pub baudrate: u32, + pub flow: FlowControl, +} +impl SerialConnection { + fn new(device: String, baudrate: u32, flow: FlowControl) -> Self { + Self { + name: format!("{} @{}", device, baudrate), + device, + baudrate, + flow, + } + } + fn name(&self) -> String { + self.name.clone() + } + fn try_connect( + self, + _shared_state: Option, + ) -> Result<(Box, Box)> { + let rdr = serialport::new(self.device, self.baudrate) + .flow_control(*self.flow) + .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) + .open()?; + let wtr = rdr.try_clone()?; + info!("Opened serial port successfully!"); + Ok((Box::new(rdr), Box::new(wtr))) + } +} + +#[derive(Clone)] +pub struct FileConnection { + pub name: String, + pub filepath: PathBuf, + close_when_done: bool, + realtime_delay: RealtimeDelay, +} +impl FileConnection { + fn new>( + filepath: P, + close_when_done: bool, + realtime_delay: RealtimeDelay, + ) -> Self { + let filepath = PathBuf::from(filepath.as_ref()); + let name = if let Some(path) = filepath.file_name() { + path + } else { + filepath.as_os_str() + }; + let name: &str = &*name.to_string_lossy(); + Self { + name: String::from(name), + filepath, + close_when_done, + realtime_delay, + } + } + + fn name(&self) -> String { + self.name.clone() + } + fn close_when_done(&self) -> bool { + self.close_when_done + } + fn realtime_delay(&self) -> RealtimeDelay { + self.realtime_delay + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)> { + let rdr = fs::File::open(&self.filepath)?; + let wtr = io::sink(); + info!("Opened file successfully!"); + if let Some(shared_state_) = shared_state { + shared_state_.update_file_history(self.filepath.to_string_lossy().to_string()); + } + Ok((Box::new(rdr), Box::new(wtr))) + } +} + +#[derive(Clone)] +pub enum Connection { + Tcp(TcpConnection), + File(FileConnection), + Serial(SerialConnection), +} +impl Connection { + pub fn tcp(host: String, port: u16) -> Self { + Connection::Tcp(TcpConnection::new(host, port)) + } + + pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Self { + Connection::Serial(SerialConnection::new(device, baudrate, flow)) + } + + pub fn file(filename: String, realtime_delay: RealtimeDelay, close_when_done: bool) -> Self { + Connection::File(FileConnection::new( + filename, + close_when_done, + realtime_delay, + )) + } + pub fn close_when_done(&self) -> bool { + match self { + Connection::File(conn) => conn.close_when_done(), + Connection::Tcp(_) | Connection::Serial(_) => false, + } + } + pub fn name(&self) -> String { + match self { + Connection::Tcp(conn) => conn.name(), + Connection::File(conn) => conn.name(), + Connection::Serial(conn) => conn.name(), + } + } + pub fn realtime_delay(&self) -> RealtimeDelay { + match self { + Connection::File(conn) => conn.realtime_delay(), + Connection::Tcp(_) | Connection::Serial(_) => RealtimeDelay::Off, + } + } + pub fn try_connect( + &self, + shared_state: Option, + ) -> Result<(Box, Box)> { + match self { + Connection::Tcp(conn) => conn.clone().try_connect(shared_state), + Connection::File(conn) => conn.clone().try_connect(shared_state), + Connection::Serial(conn) => conn.clone().try_connect(shared_state), + } + } +} + +#[derive(Debug)] +pub struct ConnectionState { + pub handler: Option>, + shared_state: SharedState, + sender: Sender>, + receiver: Receiver>, +} +impl ConnectionState { + pub fn new(client_send: ClientSender, shared_state: SharedState) -> ConnectionState { + let (sender, receiver) = unbounded(); + + ConnectionState { + handler: Some(ConnectionState::connect_thread( + client_send, + shared_state.clone(), + receiver.clone(), + )), + shared_state, + sender, + receiver, + } + } + + fn connect_thread( + client_send: ClientSender, + shared_state: SharedState, + receiver: Receiver>, + ) -> JoinHandle<()> { + thread::spawn(move || { + let mut conn = None; + while shared_state.clone().is_server_running() { + if let Ok(conn_option) = receiver.recv_timeout(Duration::from_secs_f64( + SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC, + )) { + match conn_option { + Some(conn_) => { + conn = Some(conn_); + } + None => { + conn = None; + info!("Disconnected successfully."); + } + } + } + if let Some(conn_) = conn.clone() { + if let Err(e) = + process_messages(conn_, shared_state.clone(), client_send.clone()) + { + error!("unable to process messages, {}", e); + } + if !shared_state.is_running() { + conn = None; + } + shared_state.set_running(false, client_send.clone()); + } + } + }) + } + + /// Helper function for attempting to open a file and process SBP messages from it. + /// + /// # Parameters + /// - `filename`: The path to the filename to be read for SBP messages. + pub fn connect_to_file( + &self, + filename: String, + realtime_delay: RealtimeDelay, + close_when_done: bool, + ) { + let conn = Connection::file(filename, realtime_delay, close_when_done); + self.connect(conn); + } + + /// Helper function for attempting to open a tcp connection and process SBP messages from it. + /// + /// # Parameters + /// - `host`: The host portion of the TCP stream to open. + /// - `port`: The port to be used to open a TCP stream. + pub fn connect_to_host(&self, host: String, port: u16) { + let conn = Connection::tcp(host, port); + self.connect(conn); + } + + /// Helper function for attempting to open a serial port and process SBP messages from it. + /// + /// # Parameters + /// - `device`: The string path corresponding to the serial device to connect with. + /// - `baudrate`: The baudrate to use when communicating with the serial device. + /// - `flow`: The flow control mode to use when communicating with the serial device. + pub fn connect_to_serial(&self, device: String, baudrate: u32, flow: FlowControl) { + let conn = Connection::serial(device, baudrate, flow); + self.connect(conn); + } + + /// Send disconnect signal to server state loop. + pub fn disconnect(&self, client_send: S) { + self.shared_state.set_running(false, client_send); + if let Err(err) = self.sender.try_send(None) { + error!("{}, {}", SERVER_STATE_DISCONNECT_FAILURE, err); + } + } + + /// Helper function to send connection object to server state loop. + fn connect(&self, conn: Connection) { + if let Err(err) = self.sender.try_send(Some(conn)) { + error!("{}, {}", SERVER_STATE_NEW_CONNECTION_FAILURE, err); + } + } +} + +impl Drop for ConnectionState { + fn drop(&mut self) { + self.shared_state.stop_server_running(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use directories::UserDirs; + use serial_test::serial; + use std::{ + sync::mpsc, + thread::sleep, + time::{Duration, SystemTime}, + }; + const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; + const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; + const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; + const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; + + pub mod data_directories { + #![allow(dead_code)] + pub const LINUX: &str = ".local/share/swift_navigation_console"; + pub const MACOS: &str = + "Library/Application Support/com.swift-nav.swift-nav.swift_navigation_console"; + pub const WINDOWS: &str = "AppData\\Local\\swift-nav\\swift_navigation_console\\data"; + } + + fn filename() -> PathBuf { + let user_dirs = UserDirs::new().unwrap(); + let home_dir = user_dirs.home_dir(); + #[cfg(target_os = "linux")] + { + home_dir + .join(data_directories::LINUX) + .join(CONNECTION_HISTORY_FILENAME) + } + + #[cfg(target_os = "macos")] + { + home_dir + .join(data_directories::MACOS) + .join(CONNECTION_HISTORY_FILENAME) + } + #[cfg(target_os = "windows")] + { + home_dir + .join(data_directories::WINDOWS) + .join(CONNECTION_HISTORY_FILENAME) + } + } + + fn backup_file(filename: PathBuf) { + if filename.exists() { + let mut backup_filename = filename.clone(); + backup_filename.set_extension("backup"); + fs::rename(filename, backup_filename).unwrap(); + } + } + + fn restore_backup_file(filename: PathBuf) { + let mut backup_filename = filename.clone(); + backup_filename.set_extension("backup"); + if filename.exists() { + fs::remove_file(filename.clone()).unwrap(); + } + if backup_filename.exists() { + fs::rename(backup_filename, filename).unwrap(); + } + } + + fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { + thread::spawn(move || { + let mut iter_count = 0; + + loop { + if client_recv.recv().is_err() { + break; + } + + iter_count += 1; + } + assert!(iter_count > 0); + }) + } + + #[test] + #[serial] + fn connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let connection_state = ConnectionState::new(client_send, shared_state.clone()); + let filename = TEST_SHORT_FILEPATH.to_string(); + receive_thread(client_receive); + assert!(!shared_state.is_running()); + connection_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + sleep(Duration::from_millis( + DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + )); + assert!(shared_state.is_running()); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(!shared_state.is_running()); + restore_backup_file(bfilename); + } + + #[test] + #[serial] + fn pause_via_connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let connection_state = ConnectionState::new(client_send, shared_state.clone()); + let filename = TEST_SHORT_FILEPATH.to_string(); + receive_thread(client_receive); + assert!(!shared_state.is_running()); + connection_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + sleep(Duration::from_millis( + DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + )); + assert!(shared_state.is_running()); + shared_state.set_paused(true); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(shared_state.is_running()); + shared_state.set_paused(false); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(!shared_state.is_running()); + restore_backup_file(bfilename); + println!("done"); + } + + #[test] + #[serial] + fn disconnect_via_connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); + let filename = TEST_FILEPATH.to_string(); + let expected_duration = Duration::from_secs_f64(SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC) + + Duration::from_millis(100); + let handle = receive_thread(client_receive); + assert!(!shared_state.is_running()); + { + connection_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + } + + sleep(Duration::from_millis(5)); + assert!(shared_state.is_running()); + let now = SystemTime::now(); + sleep(Duration::from_millis(1)); + connection_state.disconnect(client_send.clone()); + sleep(Duration::from_millis(10)); + assert!(!shared_state.is_running()); + shared_state.stop_server_running(); + drop(client_send); + assert!(handle.join().is_ok()); + + match now.elapsed() { + Ok(elapsed) => { + assert!( + elapsed < expected_duration, + "Time elapsed for disconnect test {:?}, expecting {:?}ms", + elapsed, + expected_duration + ); + } + Err(e) => { + panic!("unknown error {}", e); + } + } + restore_backup_file(bfilename); + } + + // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for TCPStream. + // #[test] + // fn connect_to_host_test() { + // } + + // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for serial. + // #[test] + // fn connect_to_serial_test() { + // } +} diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index 657222bb6..3562861d6 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -7,6 +7,7 @@ pub mod console_backend_capnp { } pub mod broadcaster; pub mod common_constants; +pub mod connection; pub mod constants; pub mod date_conv; pub mod errors; diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 97b1d5a47..559717445 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -6,6 +6,7 @@ use sbp::{ }; use std::{io::ErrorKind, thread::sleep, time::Duration}; +use crate::connection::Connection; use crate::constants::PAUSE_LOOP_SLEEP_DURATION_MS; use crate::log_panel::handle_log_msg; use crate::main_tab::*; diff --git a/console_backend/src/server.rs b/console_backend/src/server.rs index 04c715251..e7b16e2c4 100644 --- a/console_backend/src/server.rs +++ b/console_backend/src/server.rs @@ -15,12 +15,13 @@ use std::{ }; use crate::cli_options::*; +use crate::connection::ConnectionState; use crate::console_backend_capnp as m; use crate::constants::LOG_WRITER_BUFFER_MESSAGE_COUNT; use crate::errors::*; use crate::log_panel::{splitable_log_formatter, LogLevel, LogPanelWriter}; use crate::output::{CsvLogging, SbpLogging}; -use crate::types::{ClientSender, FlowControl, RealtimeDelay, ServerState, SharedState}; +use crate::types::{ClientSender, FlowControl, RealtimeDelay, SharedState}; use crate::utils::{refresh_loggingbar, refresh_navbar}; /// The backend server @@ -60,18 +61,18 @@ impl ServerEndpoint { /// /// # Parameters /// - `opt`: CLI Options to start specific connection type. -/// - `server_state`: The Server state to start a specific connection. +/// - `connection_state`: The Server state to start a specific connection. /// - `client_send`: Client Sender channel for communication from backend to frontend. /// - `shared_state`: The shared state for validating another connection is not already running. -fn handle_cli(opt: CliOptions, server_state: &ServerState, shared_state: SharedState) { +fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state: SharedState) { if let Some(opt_input) = opt.input { match opt_input { Input::Tcp { host, port } => { - server_state.connect_to_host(host, port); + connection_state.connect_to_host(host, port); } Input::File { file_in } => { let filename = file_in.display().to_string(); - server_state.connect_to_file(filename, RealtimeDelay::On, opt.exit_after); + connection_state.connect_to_file(filename, RealtimeDelay::On, opt.exit_after); } Input::Serial { serialport, @@ -79,7 +80,7 @@ fn handle_cli(opt: CliOptions, server_state: &ServerState, shared_state: SharedS flow_control, } => { let serialport = serialport.display().to_string(); - server_state.connect_to_serial(serialport, baudrate, flow_control); + connection_state.connect_to_serial(serialport, baudrate, flow_control); } } } @@ -140,7 +141,7 @@ impl Server { inner: client_send_, }; let shared_state = SharedState::new(); - let server_state = ServerState::new(client_send.clone(), shared_state.clone()); + let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); let logger = Logger::builder() .buf_size(LOG_WRITER_BUFFER_MESSAGE_COUNT) @@ -152,7 +153,7 @@ impl Server { log::set_boxed_logger(Box::new(logger)).expect("Failed to set logger"); // Handle CLI Opts. - handle_cli(opt, &server_state, shared_state.clone()); + handle_cli(opt, &connection_state, shared_state.clone()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); refresh_loggingbar(&mut client_send.clone(), shared_state.clone()); thread::spawn(move || loop { @@ -196,14 +197,14 @@ impl Server { refresh_navbar(&mut client_send_clone.clone(), shared_state_clone); } m::message::DisconnectRequest(Ok(_)) => { - server_state.disconnect(client_send_clone.clone()); + connection_state.disconnect(client_send_clone.clone()); } m::message::FileRequest(Ok(req)) => { let filename = req .get_filename() .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let filename = filename.to_string(); - server_state.connect_to_file( + connection_state.connect_to_file( filename, RealtimeDelay::On, /*close_when_done*/ false, @@ -220,7 +221,7 @@ impl Server { let host = req.get_host().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let port = req.get_port(); - server_state.connect_to_host(host.to_string(), port); + connection_state.connect_to_host(host.to_string(), port); } m::message::SerialRequest(Ok(req)) => { let device = @@ -229,7 +230,7 @@ impl Server { let baudrate = req.get_baudrate(); let flow = req.get_flow_control().unwrap(); let flow = FlowControl::from_str(flow).unwrap(); - server_state.connect_to_serial(device, baudrate, flow); + connection_state.connect_to_serial(device, baudrate, flow); } _ => println!("err"), } diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index a00fa297a..01dbc40c3 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -4,15 +4,14 @@ use crate::errors::*; use crate::log_panel::LogLevel; use crate::output::CsvLogging; use crate::piksi_tools_constants::*; -use crate::process_messages::process_messages; + use crate::utils::{mm_to_m, ms_to_sec, set_connected_frontend}; use anyhow::{Context, Result as AHResult}; use chrono::{DateTime, Utc}; -use crossbeam::channel::{unbounded, Receiver, Sender}; use directories::{ProjectDirs, UserDirs}; use indexmap::set::IndexSet; use lazy_static::lazy_static; -use log::{error, info}; +use log::error; use ordered_float::OrderedFloat; use sbp::codec::dencode::{FramedWrite, IterSinkExt}; use sbp::codec::sbp::SbpEncoder; @@ -37,21 +36,15 @@ use std::{ fmt::Debug, fs, hash::Hash, - io, - net::{SocketAddr, TcpStream, ToSocketAddrs}, ops::Deref, - path::{Path, PathBuf}, + path::PathBuf, str::FromStr, sync::{ self, atomic::{AtomicBool, Ordering::*}, - // mpsc::Sender, - Arc, - Mutex, + Arc, Mutex, }, - thread, - thread::JoinHandle, - time::{Duration, Instant}, + time::Instant, }; pub type Error = std::boxed::Box; @@ -121,7 +114,7 @@ impl Deque { pub trait MessageSender: Debug + Clone + Send { fn send_data(&mut self, msg_bytes: Vec); } -use std::ops::Drop; + #[derive(Debug, Clone)] pub struct ClientSender { pub inner: sync::mpsc::Sender>, @@ -171,122 +164,6 @@ impl Clone for ArcBool { } } -#[derive(Debug)] -pub struct ServerState { - pub handler: Option>, - shared_state: SharedState, - sender: Sender>, - receiver: Receiver>, -} -impl ServerState { - pub fn new(client_send: ClientSender, shared_state: SharedState) -> ServerState { - let (sender, receiver) = unbounded(); - - ServerState { - handler: Some(ServerState::connect_thread( - client_send, - shared_state.clone(), - receiver.clone(), - )), - shared_state, - sender, - receiver, - } - } - - fn connect_thread( - client_send: ClientSender, - shared_state: SharedState, - receiver: Receiver>, - ) -> JoinHandle<()> { - thread::spawn(move || { - let mut conn = None; - while shared_state.clone().is_server_running() { - if let Ok(conn_option) = receiver.recv_timeout(Duration::from_secs_f64( - SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC, - )) { - match conn_option { - Some(conn_) => { - conn = Some(conn_); - } - None => { - conn = None; - info!("Disconnected successfully."); - } - } - } - if let Some(conn_) = conn.clone() { - if let Err(e) = - process_messages(conn_, shared_state.clone(), client_send.clone()) - { - error!("unable to process messages, {}", e); - } - if !shared_state.is_running() { - conn = None; - } - shared_state.set_running(false, client_send.clone()); - } - } - }) - } - - /// Helper function for attempting to open a file and process SBP messages from it. - /// - /// # Parameters - /// - `filename`: The path to the filename to be read for SBP messages. - pub fn connect_to_file( - &self, - filename: String, - realtime_delay: RealtimeDelay, - close_when_done: bool, - ) { - let conn = Connection::file(filename, realtime_delay, close_when_done); - self.connect(conn); - } - - /// Helper function for attempting to open a tcp connection and process SBP messages from it. - /// - /// # Parameters - /// - `host`: The host portion of the TCP stream to open. - /// - `port`: The port to be used to open a TCP stream. - pub fn connect_to_host(&self, host: String, port: u16) { - let conn = Connection::tcp(host, port); - self.connect(conn); - } - - /// Helper function for attempting to open a serial port and process SBP messages from it. - /// - /// # Parameters - /// - `device`: The string path corresponding to the serial device to connect with. - /// - `baudrate`: The baudrate to use when communicating with the serial device. - /// - `flow`: The flow control mode to use when communicating with the serial device. - pub fn connect_to_serial(&self, device: String, baudrate: u32, flow: FlowControl) { - let conn = Connection::serial(device, baudrate, flow); - self.connect(conn); - } - - /// Send disconnect signal to server state loop. - pub fn disconnect(&self, client_send: S) { - self.shared_state.set_running(false, client_send); - if let Err(err) = self.sender.try_send(None) { - error!("{}, {}", SERVER_STATE_DISCONNECT_FAILURE, err); - } - } - - /// Helper function to send connection object to server state loop. - fn connect(&self, conn: Connection) { - if let Err(err) = self.sender.try_send(Some(conn)) { - error!("{}, {}", SERVER_STATE_NEW_CONNECTION_FAILURE, err); - } - } -} - -impl Drop for ServerState { - fn drop(&mut self) { - self.shared_state.stop_server_running(); - } -} - #[derive(Debug)] pub struct SharedState(Arc>); @@ -1834,218 +1711,10 @@ impl std::str::FromStr for VelocityUnits { } } -#[derive(Clone)] -pub struct TcpConnection { - name: String, - host: String, - port: u16, -} -impl TcpConnection { - fn new(host: String, port: u16) -> Self { - let name = format!("{}:{}", host, port); - Self { name, host, port } - } - fn socket_addrs(name: String) -> Result { - let socket = &mut name.to_socket_addrs()?; - let socket = if let Some(socket_) = socket.next() { - socket_ - } else { - let e: Box = String::from(TCP_CONNECTION_PARSING_FAILURE).into(); - return Err(e); - }; - Ok(socket) - } -} -impl ConnectionType for TcpConnection { - fn name(&self) -> String { - self.name.clone() - } - fn try_connect( - self, - shared_state: Option, - ) -> Result<(Box, Box)> { - let socket = TcpConnection::socket_addrs(self.name.clone())?; - let rdr = - TcpStream::connect_timeout(&socket, Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS))?; - rdr.set_read_timeout(Some(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)))?; - let wtr = rdr.try_clone()?; - info!("Connected to tcp stream!"); - if let Some(shared_state_) = shared_state { - shared_state_.update_tcp_history(self.host, self.port); - } - Ok((Box::new(rdr), Box::new(wtr))) - } -} - -#[derive(Clone)] -pub struct SerialConnection { - pub name: String, - pub device: String, - pub baudrate: u32, - pub flow: FlowControl, -} -impl SerialConnection { - fn new(device: String, baudrate: u32, flow: FlowControl) -> Self { - Self { - name: format!("{} @{}", device, baudrate), - device, - baudrate, - flow, - } - } -} -impl ConnectionType for SerialConnection { - fn name(&self) -> String { - self.name.clone() - } - fn try_connect( - self, - _shared_state: Option, - ) -> Result<(Box, Box)> { - let rdr = serialport::new(self.device, self.baudrate) - .flow_control(*self.flow) - .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) - .open()?; - let wtr = rdr.try_clone()?; - info!("Opened serial port successfully!"); - Ok((Box::new(rdr), Box::new(wtr))) - } -} - -#[derive(Clone)] -pub struct FileConnection { - pub name: String, - pub filepath: PathBuf, - close_when_done: bool, - realtime_delay: RealtimeDelay, -} -impl FileConnection { - fn new>( - filepath: P, - close_when_done: bool, - realtime_delay: RealtimeDelay, - ) -> Self { - let filepath = PathBuf::from(filepath.as_ref()); - let name = if let Some(path) = filepath.file_name() { - path - } else { - filepath.as_os_str() - }; - let name: &str = &*name.to_string_lossy(); - Self { - name: String::from(name), - filepath, - close_when_done, - realtime_delay, - } - } -} -impl ConnectionType for FileConnection { - fn name(&self) -> String { - self.name.clone() - } - fn close_when_done(&self) -> bool { - self.close_when_done - } - fn realtime_delay(&self) -> RealtimeDelay { - self.realtime_delay - } - fn try_connect( - self, - shared_state: Option, - ) -> Result<(Box, Box)> { - let rdr = fs::File::open(&self.filepath)?; - let wtr = io::sink(); - info!("Opened file successfully!"); - if let Some(shared_state_) = shared_state { - shared_state_.update_file_history(self.filepath.to_string_lossy().to_string()); - } - Ok((Box::new(rdr), Box::new(wtr))) - } -} - -trait ConnectionType { - fn close_when_done(&self) -> bool { - false - } - fn name(&self) -> String; - fn realtime_delay(&self) -> RealtimeDelay { - RealtimeDelay::Off - } - fn try_connect( - self, - shared_state: Option, - ) -> Result<(Box, Box)>; -} - -#[derive(Clone)] -pub enum Connection { - Tcp(TcpConnection), - File(FileConnection), - Serial(SerialConnection), -} -impl Connection { - pub fn tcp(host: String, port: u16) -> Self { - Connection::Tcp(TcpConnection::new(host, port)) - } - - pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Self { - Connection::Serial(SerialConnection::new(device, baudrate, flow)) - } - - pub fn file(filename: String, realtime_delay: RealtimeDelay, close_when_done: bool) -> Self { - Connection::File(FileConnection::new( - filename, - close_when_done, - realtime_delay, - )) - } - pub fn close_when_done(&self) -> bool { - match self { - Connection::Tcp(conn) => conn.close_when_done(), - Connection::File(conn) => conn.close_when_done(), - Connection::Serial(conn) => conn.close_when_done(), - } - } - pub fn name(&self) -> String { - match self { - Connection::Tcp(conn) => conn.name(), - Connection::File(conn) => conn.name(), - Connection::Serial(conn) => conn.name(), - } - } - pub fn realtime_delay(&self) -> RealtimeDelay { - match self { - Connection::Tcp(conn) => conn.realtime_delay(), - Connection::File(conn) => conn.realtime_delay(), - Connection::Serial(conn) => conn.realtime_delay(), - } - } - pub fn try_connect( - &self, - shared_state: Option, - ) -> Result<(Box, Box)> { - match self { - Connection::Tcp(conn) => conn.clone().try_connect(shared_state), - Connection::File(conn) => conn.clone().try_connect(shared_state), - Connection::Serial(conn) => conn.clone().try_connect(shared_state), - } - } -} - #[cfg(test)] mod tests { use super::*; use serial_test::serial; - use std::{ - sync::mpsc, - thread::sleep, - time::{Duration, SystemTime}, - }; - const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; - const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; - const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; - const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; pub mod data_directories { #![allow(dead_code)] @@ -2182,141 +1851,4 @@ mod tests { assert_eq!(conn_history.files().len(), MAX_CONNECTION_HISTORY as usize); restore_backup_file(bfilename); } - - fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { - thread::spawn(move || { - let mut iter_count = 0; - - loop { - if client_recv.recv().is_err() { - break; - } - - iter_count += 1; - } - assert!(iter_count > 0); - }) - } - - #[test] - #[serial] - fn connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(client_send, shared_state.clone()); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - server_state.connect_to_file( - filename, - RealtimeDelay::On, - /*close_when_done = */ true, - ); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - } - - #[test] - #[serial] - fn pause_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(client_send, shared_state.clone()); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - server_state.connect_to_file( - filename, - RealtimeDelay::On, - /*close_when_done = */ true, - ); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - shared_state.set_paused(true); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(shared_state.is_running()); - shared_state.set_paused(false); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - println!("done"); - } - - #[test] - #[serial] - fn disconnect_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(client_send.clone(), shared_state.clone()); - let filename = TEST_FILEPATH.to_string(); - let expected_duration = Duration::from_secs_f64(SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC) - + Duration::from_millis(100); - let handle = receive_thread(client_receive); - assert!(!shared_state.is_running()); - { - server_state.connect_to_file( - filename, - RealtimeDelay::On, - /*close_when_done = */ true, - ); - } - - sleep(Duration::from_millis(5)); - assert!(shared_state.is_running()); - let now = SystemTime::now(); - sleep(Duration::from_millis(1)); - server_state.disconnect(client_send.clone()); - sleep(Duration::from_millis(10)); - assert!(!shared_state.is_running()); - shared_state.stop_server_running(); - drop(client_send); - assert!(handle.join().is_ok()); - - match now.elapsed() { - Ok(elapsed) => { - assert!( - elapsed < expected_duration, - "Time elapsed for disconnect test {:?}, expecting {:?}ms", - elapsed, - expected_duration - ); - } - Err(e) => { - panic!("unknown error {}", e); - } - } - restore_backup_file(bfilename); - } - - // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for TCPStream. - // #[test] - // fn connect_to_host_test() { - // } - - // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for serial. - // #[test] - // fn connect_to_serial_test() { - // } } diff --git a/console_backend/tests/mem_benches.rs b/console_backend/tests/mem_benches.rs index ad4b68222..44ecf268c 100644 --- a/console_backend/tests/mem_benches.rs +++ b/console_backend/tests/mem_benches.rs @@ -12,8 +12,9 @@ mod mem_bench_impl { use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use console_backend::{ + connection::Connection, process_messages, - types::{ClientSender, Connection, RealtimeDelay, SharedState}, + types::{ClientSender, RealtimeDelay, SharedState}, }; const BENCH_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; From 23cc00d38d961d7b6cd004c48faa2cb180efef67 Mon Sep 17 00:00:00 2001 From: John Michael Burke Date: Mon, 12 Jul 2021 11:00:47 -0700 Subject: [PATCH 5/6] Add test_common module. --- Makefile.toml | 8 +++-- console_backend/Cargo.toml | 5 ++- console_backend/src/connection.rs | 54 +--------------------------- console_backend/src/lib.rs | 58 +++++++++++++++++++++++++++++++ console_backend/src/types.rs | 52 +-------------------------- 5 files changed, 70 insertions(+), 107 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index cf4476000..795892036 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -247,14 +247,18 @@ hdiutil create -volname SwiftNavConsole -srcfolder main.app -ov -format UDZO 'sw ''' [tasks.qml-format] -env = { QT_APP = { script = ["conda run -n $CONDA_ENV python utils/echo-qt-dir.py"] } } +env = { QT_APP = { script = [ + "conda run -n $CONDA_ENV python utils/echo-qt-dir.py", +] } } script_runner = "@shell" script = ''' ${QT_APP}/Qt/bin/qmlformat -i $QML_FILES ''' [tasks.qml-lint] -env = { QT_APP = { script = ["conda run -n $CONDA_ENV python utils/echo-qt-dir.py"] } } +env = { QT_APP = { script = [ + "conda run -n $CONDA_ENV python utils/echo-qt-dir.py", +] } } script_runner = "@shell" script = ''' ${QT_APP}/Qt/bin/qmllint -I ${QT_APP}/../PySide2/Qt/qml/ -I resources/ $QML_FILES diff --git a/console_backend/Cargo.toml b/console_backend/Cargo.toml index 9c3e55b18..c9294db2f 100644 --- a/console_backend/Cargo.toml +++ b/console_backend/Cargo.toml @@ -17,7 +17,10 @@ chrono = { version = "0.4", features = ["serde"] } csv = "1" paste = "1" pyo3 = { version = "0.13", features = ["extension-module"], optional = true } -sbp = { git = "https://github.com/swift-nav/libsbp.git", rev = "ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f", features = ["json", "swiftnav-rs"] } +sbp = { git = "https://github.com/swift-nav/libsbp.git", rev = "ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f", features = [ + "json", + "swiftnav-rs", +] } serde = { version = "1.0.123", features = ["derive"] } tempfile = "3.2.0" ordered-float = "2.0" diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs index 34e0380e8..61fb466c5 100644 --- a/console_backend/src/connection.rs +++ b/console_backend/src/connection.rs @@ -317,7 +317,7 @@ impl Drop for ConnectionState { #[cfg(test)] mod tests { use super::*; - use directories::UserDirs; + use crate::test_common::{backup_file, filename, restore_backup_file}; use serial_test::serial; use std::{ sync::mpsc, @@ -329,57 +329,6 @@ mod tests { const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; - pub mod data_directories { - #![allow(dead_code)] - pub const LINUX: &str = ".local/share/swift_navigation_console"; - pub const MACOS: &str = - "Library/Application Support/com.swift-nav.swift-nav.swift_navigation_console"; - pub const WINDOWS: &str = "AppData\\Local\\swift-nav\\swift_navigation_console\\data"; - } - - fn filename() -> PathBuf { - let user_dirs = UserDirs::new().unwrap(); - let home_dir = user_dirs.home_dir(); - #[cfg(target_os = "linux")] - { - home_dir - .join(data_directories::LINUX) - .join(CONNECTION_HISTORY_FILENAME) - } - - #[cfg(target_os = "macos")] - { - home_dir - .join(data_directories::MACOS) - .join(CONNECTION_HISTORY_FILENAME) - } - #[cfg(target_os = "windows")] - { - home_dir - .join(data_directories::WINDOWS) - .join(CONNECTION_HISTORY_FILENAME) - } - } - - fn backup_file(filename: PathBuf) { - if filename.exists() { - let mut backup_filename = filename.clone(); - backup_filename.set_extension("backup"); - fs::rename(filename, backup_filename).unwrap(); - } - } - - fn restore_backup_file(filename: PathBuf) { - let mut backup_filename = filename.clone(); - backup_filename.set_extension("backup"); - if filename.exists() { - fs::remove_file(filename.clone()).unwrap(); - } - if backup_filename.exists() { - fs::rename(backup_filename, filename).unwrap(); - } - } - fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { thread::spawn(move || { let mut iter_count = 0; @@ -453,7 +402,6 @@ mod tests { sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); assert!(!shared_state.is_running()); restore_backup_file(bfilename); - println!("done"); } #[test] diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index 3562861d6..44bf34a52 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -29,3 +29,61 @@ pub mod status_bar; pub mod tracking_signals_tab; pub mod types; pub mod utils; + +#[cfg(test)] +pub mod test_common { + use crate::constants::*; + use directories::UserDirs; + use std::{fs, path::PathBuf}; + + pub mod data_directories { + #![allow(dead_code)] + pub const LINUX: &str = ".local/share/swift_navigation_console"; + pub const MACOS: &str = + "Library/Application Support/com.swift-nav.swift-nav.swift_navigation_console"; + pub const WINDOWS: &str = "AppData\\Local\\swift-nav\\swift_navigation_console\\data"; + } + + pub fn filename() -> PathBuf { + let user_dirs = UserDirs::new().unwrap(); + let home_dir = user_dirs.home_dir(); + #[cfg(target_os = "linux")] + { + home_dir + .join(data_directories::LINUX) + .join(CONNECTION_HISTORY_FILENAME) + } + + #[cfg(target_os = "macos")] + { + home_dir + .join(data_directories::MACOS) + .join(CONNECTION_HISTORY_FILENAME) + } + #[cfg(target_os = "windows")] + { + home_dir + .join(data_directories::WINDOWS) + .join(CONNECTION_HISTORY_FILENAME) + } + } + + pub fn backup_file(filename: PathBuf) { + if filename.exists() { + let mut backup_filename = filename.clone(); + backup_filename.set_extension("backup"); + fs::rename(filename, backup_filename).unwrap(); + } + } + + pub fn restore_backup_file(filename: PathBuf) { + let mut backup_filename = filename.clone(); + backup_filename.set_extension("backup"); + if filename.exists() { + fs::remove_file(filename.clone()).unwrap(); + } + if backup_filename.exists() { + fs::rename(backup_filename, filename).unwrap(); + } + } +} diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 01dbc40c3..9d74bd012 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -1714,16 +1714,9 @@ impl std::str::FromStr for VelocityUnits { #[cfg(test)] mod tests { use super::*; + use crate::test_common::{backup_file, data_directories, filename, restore_backup_file}; use serial_test::serial; - pub mod data_directories { - #![allow(dead_code)] - pub const LINUX: &str = ".local/share/swift_navigation_console"; - pub const MACOS: &str = - "Library/Application Support/com.swift-nav.swift-nav.swift_navigation_console"; - pub const WINDOWS: &str = "AppData\\Local\\swift-nav\\swift_navigation_console\\data"; - } - #[test] fn create_data_dir_test() { create_data_dir().unwrap(); @@ -1744,49 +1737,6 @@ mod tests { } } - fn filename() -> PathBuf { - let user_dirs = UserDirs::new().unwrap(); - let home_dir = user_dirs.home_dir(); - #[cfg(target_os = "linux")] - { - home_dir - .join(data_directories::LINUX) - .join(CONNECTION_HISTORY_FILENAME) - } - - #[cfg(target_os = "macos")] - { - home_dir - .join(data_directories::MACOS) - .join(CONNECTION_HISTORY_FILENAME) - } - #[cfg(target_os = "windows")] - { - home_dir - .join(data_directories::WINDOWS) - .join(CONNECTION_HISTORY_FILENAME) - } - } - - fn backup_file(filename: PathBuf) { - if filename.exists() { - let mut backup_filename = filename.clone(); - backup_filename.set_extension("backup"); - fs::rename(filename, backup_filename).unwrap(); - } - } - - fn restore_backup_file(filename: PathBuf) { - let mut backup_filename = filename.clone(); - backup_filename.set_extension("backup"); - if filename.exists() { - fs::remove_file(filename.clone()).unwrap(); - } - if backup_filename.exists() { - fs::rename(backup_filename, filename).unwrap(); - } - } - #[test] #[serial] fn connection_history_save_test() { From b3f9c2db27e7e72df58f161833d8887222bcba65 Mon Sep 17 00:00:00 2001 From: John Michael Burke Date: Mon, 12 Jul 2021 13:50:59 -0700 Subject: [PATCH 6/6] Add a few more unittests. --- console_backend/src/connection.rs | 38 +++++++++++++++++++++++++++++++ console_backend/src/types.rs | 2 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs index 61fb466c5..e13314050 100644 --- a/console_backend/src/connection.rs +++ b/console_backend/src/connection.rs @@ -320,6 +320,7 @@ mod tests { use crate::test_common::{backup_file, filename, restore_backup_file}; use serial_test::serial; use std::{ + str::FromStr, sync::mpsc, thread::sleep, time::{Duration, SystemTime}, @@ -329,6 +330,43 @@ mod tests { const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; + #[test] + fn create_tcp() { + let host = String::from("0.0.0.0"); + let port = 55555; + let conn = Connection::tcp(host.clone(), port); + assert_eq!(conn.name(), format!("{}:{}", host, port)); + assert!(!conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::Off); + } + + #[test] + fn create_file() { + let filepath = String::from(TEST_FILEPATH); + let realtime_delay_on = RealtimeDelay::On; + let realtime_delay_off = RealtimeDelay::Off; + let close_when_done_false = false; + let close_when_done_true = true; + let conn = Connection::file(filepath.clone(), realtime_delay_on, close_when_done_true); + assert_eq!(conn.name(), String::from("piksi-relay-1min.sbp")); + assert!(conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::On); + let conn = Connection::file(filepath, realtime_delay_off, close_when_done_false); + assert!(!conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::Off); + } + + #[test] + fn create_serial() { + let device = String::from("/dev/ttyUSB0"); + let baudrate = 115200; + let flow = FlowControl::from_str(FLOW_CONTROL_NONE).unwrap(); + let conn = Connection::serial(device.clone(), baudrate, flow); + assert_eq!(conn.name(), format!("{} @{}", device, baudrate)); + assert!(!conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::Off); + } + fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { thread::spawn(move || { let mut iter_count = 0; diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 9d74bd012..e698385dc 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -492,7 +492,7 @@ impl SolutionVelocityTabState { } // Main Tab Types. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum RealtimeDelay { On, Off,