diff --git a/Makefile.toml b/Makefile.toml index cf4476000..795892036 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -247,14 +247,18 @@ hdiutil create -volname SwiftNavConsole -srcfolder main.app -ov -format UDZO 'sw ''' [tasks.qml-format] -env = { QT_APP = { script = ["conda run -n $CONDA_ENV python utils/echo-qt-dir.py"] } } +env = { QT_APP = { script = [ + "conda run -n $CONDA_ENV python utils/echo-qt-dir.py", +] } } script_runner = "@shell" script = ''' ${QT_APP}/Qt/bin/qmlformat -i $QML_FILES ''' [tasks.qml-lint] -env = { QT_APP = { script = ["conda run -n $CONDA_ENV python utils/echo-qt-dir.py"] } } +env = { QT_APP = { script = [ + "conda run -n $CONDA_ENV python utils/echo-qt-dir.py", +] } } script_runner = "@shell" script = ''' ${QT_APP}/Qt/bin/qmllint -I ${QT_APP}/../PySide2/Qt/qml/ -I resources/ $QML_FILES diff --git a/console_backend/Cargo.toml b/console_backend/Cargo.toml index 9c3e55b18..c9294db2f 100644 --- a/console_backend/Cargo.toml +++ b/console_backend/Cargo.toml @@ -17,7 +17,10 @@ chrono = { version = "0.4", features = ["serde"] } csv = "1" paste = "1" pyo3 = { version = "0.13", features = ["extension-module"], optional = true } -sbp = { git = "https://github.com/swift-nav/libsbp.git", rev = "ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f", features = ["json", "swiftnav-rs"] } +sbp = { git = "https://github.com/swift-nav/libsbp.git", rev = "ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f", features = [ + "json", + "swiftnav-rs", +] } serde = { version = "1.0.123", features = ["derive"] } tempfile = "3.2.0" ordered-float = "2.0" diff --git a/console_backend/benches/cpu_benches.rs b/console_backend/benches/cpu_benches.rs index 5780a1d6a..1c526a034 100644 --- a/console_backend/benches/cpu_benches.rs +++ b/console_backend/benches/cpu_benches.rs @@ -11,8 +11,9 @@ use std::{ extern crate console_backend; use console_backend::{ + connection::Connection, process_messages, - types::{ClientSender, Connection, RealtimeDelay, SharedState}, + types::{ClientSender, RealtimeDelay, SharedState}, }; const BENCH_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; @@ -61,8 +62,12 @@ fn run_process_messages(file_in_name: &str, failure: bool) { inner: client_send_, }; shared_state.set_running(true, client_send.clone()); - let conn = Connection::file(file_in_name.into()).unwrap(); - process_messages::process_messages(conn, shared_state, client_send, RealtimeDelay::Off); + let conn = Connection::file( + file_in_name.into(), + RealtimeDelay::Off, + /*close_when_done=*/ true, + ); + process_messages::process_messages(conn, shared_state, client_send).unwrap(); } recv_thread.join().expect("join should succeed"); } diff --git a/console_backend/src/bin/fileio.rs b/console_backend/src/bin/fileio.rs index 496898179..33266ae65 100644 --- a/console_backend/src/bin/fileio.rs +++ b/console_backend/src/bin/fileio.rs @@ -70,7 +70,7 @@ fn main() -> Result<()> { dest, input, } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); @@ -88,7 +88,7 @@ fn main() -> Result<()> { dest, input, } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); @@ -104,7 +104,7 @@ fn main() -> Result<()> { .unwrap() } Opts::List { path, input } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); @@ -117,7 +117,7 @@ fn main() -> Result<()> { .unwrap() } Opts::Delete { path, input } => { - let (rdr, wtr) = input.into_conn()?.into_io(); + let (rdr, wtr) = input.into_conn().try_connect(/*shared_state=*/ None)?; let sender = MsgSender::new(wtr); scope(|s| { s.spawn(|_| run(rdr)); diff --git a/console_backend/src/cli_options.rs b/console_backend/src/cli_options.rs index e6e371e31..f7428d87b 100644 --- a/console_backend/src/cli_options.rs +++ b/console_backend/src/cli_options.rs @@ -11,7 +11,7 @@ use crate::log_panel::LogLevel; use crate::types::FlowControl; use crate::{ common_constants::{SbpLogging, Tabs}, - types::Connection, + connection::Connection, }; #[derive(Debug)] @@ -177,7 +177,7 @@ pub enum Input { } impl Input { - pub fn into_conn(self) -> crate::types::Result { + pub fn into_conn(self) -> Connection { match self { Input::Tcp { host, port } => Connection::tcp(host, port), Input::Serial { @@ -185,7 +185,11 @@ impl Input { baudrate, flow_control, } => Connection::serial(serialport.to_string_lossy().into(), baudrate, flow_control), - Input::File { file_in } => Connection::file(file_in.to_string_lossy().into()), + Input::File { file_in } => Connection::file( + file_in.to_string_lossy().into(), + crate::types::RealtimeDelay::On, + /*close_when_done=*/ false, + ), } } } diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs new file mode 100644 index 000000000..e13314050 --- /dev/null +++ b/console_backend/src/connection.rs @@ -0,0 +1,505 @@ +use crate::constants::*; +use crate::errors::*; +use crate::process_messages::process_messages; +use crate::types::*; +use chrono::{DateTime, Utc}; +use crossbeam::channel::{unbounded, Receiver, Sender}; +use log::{error, info}; +use std::{ + fmt::Debug, + fs, io, + net::{SocketAddr, TcpStream, ToSocketAddrs}, + ops::Drop, + path::{Path, PathBuf}, + thread, + thread::JoinHandle, + time::Duration, +}; + +pub type Error = std::boxed::Box; +pub type Result = std::result::Result; +pub type UtcDateTime = DateTime; + +#[derive(Clone)] +pub struct TcpConnection { + name: String, + host: String, + port: u16, +} +impl TcpConnection { + fn new(host: String, port: u16) -> Self { + let name = format!("{}:{}", host, port); + Self { name, host, port } + } + fn socket_addrs(name: String) -> Result { + let socket = &mut name.to_socket_addrs()?; + let socket = if let Some(socket_) = socket.next() { + socket_ + } else { + let e: Box = String::from(TCP_CONNECTION_PARSING_FAILURE).into(); + return Err(e); + }; + Ok(socket) + } + fn name(&self) -> String { + self.name.clone() + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)> { + let socket = TcpConnection::socket_addrs(self.name.clone())?; + let rdr = + TcpStream::connect_timeout(&socket, Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS))?; + rdr.set_read_timeout(Some(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)))?; + let wtr = rdr.try_clone()?; + info!("Connected to tcp stream!"); + if let Some(shared_state_) = shared_state { + shared_state_.update_tcp_history(self.host, self.port); + } + Ok((Box::new(rdr), Box::new(wtr))) + } +} + +#[derive(Clone)] +pub struct SerialConnection { + pub name: String, + pub device: String, + pub baudrate: u32, + pub flow: FlowControl, +} +impl SerialConnection { + fn new(device: String, baudrate: u32, flow: FlowControl) -> Self { + Self { + name: format!("{} @{}", device, baudrate), + device, + baudrate, + flow, + } + } + fn name(&self) -> String { + self.name.clone() + } + fn try_connect( + self, + _shared_state: Option, + ) -> Result<(Box, Box)> { + let rdr = serialport::new(self.device, self.baudrate) + .flow_control(*self.flow) + .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) + .open()?; + let wtr = rdr.try_clone()?; + info!("Opened serial port successfully!"); + Ok((Box::new(rdr), Box::new(wtr))) + } +} + +#[derive(Clone)] +pub struct FileConnection { + pub name: String, + pub filepath: PathBuf, + close_when_done: bool, + realtime_delay: RealtimeDelay, +} +impl FileConnection { + fn new>( + filepath: P, + close_when_done: bool, + realtime_delay: RealtimeDelay, + ) -> Self { + let filepath = PathBuf::from(filepath.as_ref()); + let name = if let Some(path) = filepath.file_name() { + path + } else { + filepath.as_os_str() + }; + let name: &str = &*name.to_string_lossy(); + Self { + name: String::from(name), + filepath, + close_when_done, + realtime_delay, + } + } + + fn name(&self) -> String { + self.name.clone() + } + fn close_when_done(&self) -> bool { + self.close_when_done + } + fn realtime_delay(&self) -> RealtimeDelay { + self.realtime_delay + } + fn try_connect( + self, + shared_state: Option, + ) -> Result<(Box, Box)> { + let rdr = fs::File::open(&self.filepath)?; + let wtr = io::sink(); + info!("Opened file successfully!"); + if let Some(shared_state_) = shared_state { + shared_state_.update_file_history(self.filepath.to_string_lossy().to_string()); + } + Ok((Box::new(rdr), Box::new(wtr))) + } +} + +#[derive(Clone)] +pub enum Connection { + Tcp(TcpConnection), + File(FileConnection), + Serial(SerialConnection), +} +impl Connection { + pub fn tcp(host: String, port: u16) -> Self { + Connection::Tcp(TcpConnection::new(host, port)) + } + + pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Self { + Connection::Serial(SerialConnection::new(device, baudrate, flow)) + } + + pub fn file(filename: String, realtime_delay: RealtimeDelay, close_when_done: bool) -> Self { + Connection::File(FileConnection::new( + filename, + close_when_done, + realtime_delay, + )) + } + pub fn close_when_done(&self) -> bool { + match self { + Connection::File(conn) => conn.close_when_done(), + Connection::Tcp(_) | Connection::Serial(_) => false, + } + } + pub fn name(&self) -> String { + match self { + Connection::Tcp(conn) => conn.name(), + Connection::File(conn) => conn.name(), + Connection::Serial(conn) => conn.name(), + } + } + pub fn realtime_delay(&self) -> RealtimeDelay { + match self { + Connection::File(conn) => conn.realtime_delay(), + Connection::Tcp(_) | Connection::Serial(_) => RealtimeDelay::Off, + } + } + pub fn try_connect( + &self, + shared_state: Option, + ) -> Result<(Box, Box)> { + match self { + Connection::Tcp(conn) => conn.clone().try_connect(shared_state), + Connection::File(conn) => conn.clone().try_connect(shared_state), + Connection::Serial(conn) => conn.clone().try_connect(shared_state), + } + } +} + +#[derive(Debug)] +pub struct ConnectionState { + pub handler: Option>, + shared_state: SharedState, + sender: Sender>, + receiver: Receiver>, +} +impl ConnectionState { + pub fn new(client_send: ClientSender, shared_state: SharedState) -> ConnectionState { + let (sender, receiver) = unbounded(); + + ConnectionState { + handler: Some(ConnectionState::connect_thread( + client_send, + shared_state.clone(), + receiver.clone(), + )), + shared_state, + sender, + receiver, + } + } + + fn connect_thread( + client_send: ClientSender, + shared_state: SharedState, + receiver: Receiver>, + ) -> JoinHandle<()> { + thread::spawn(move || { + let mut conn = None; + while shared_state.clone().is_server_running() { + if let Ok(conn_option) = receiver.recv_timeout(Duration::from_secs_f64( + SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC, + )) { + match conn_option { + Some(conn_) => { + conn = Some(conn_); + } + None => { + conn = None; + info!("Disconnected successfully."); + } + } + } + if let Some(conn_) = conn.clone() { + if let Err(e) = + process_messages(conn_, shared_state.clone(), client_send.clone()) + { + error!("unable to process messages, {}", e); + } + if !shared_state.is_running() { + conn = None; + } + shared_state.set_running(false, client_send.clone()); + } + } + }) + } + + /// Helper function for attempting to open a file and process SBP messages from it. + /// + /// # Parameters + /// - `filename`: The path to the filename to be read for SBP messages. + pub fn connect_to_file( + &self, + filename: String, + realtime_delay: RealtimeDelay, + close_when_done: bool, + ) { + let conn = Connection::file(filename, realtime_delay, close_when_done); + self.connect(conn); + } + + /// Helper function for attempting to open a tcp connection and process SBP messages from it. + /// + /// # Parameters + /// - `host`: The host portion of the TCP stream to open. + /// - `port`: The port to be used to open a TCP stream. + pub fn connect_to_host(&self, host: String, port: u16) { + let conn = Connection::tcp(host, port); + self.connect(conn); + } + + /// Helper function for attempting to open a serial port and process SBP messages from it. + /// + /// # Parameters + /// - `device`: The string path corresponding to the serial device to connect with. + /// - `baudrate`: The baudrate to use when communicating with the serial device. + /// - `flow`: The flow control mode to use when communicating with the serial device. + pub fn connect_to_serial(&self, device: String, baudrate: u32, flow: FlowControl) { + let conn = Connection::serial(device, baudrate, flow); + self.connect(conn); + } + + /// Send disconnect signal to server state loop. + pub fn disconnect(&self, client_send: S) { + self.shared_state.set_running(false, client_send); + if let Err(err) = self.sender.try_send(None) { + error!("{}, {}", SERVER_STATE_DISCONNECT_FAILURE, err); + } + } + + /// Helper function to send connection object to server state loop. + fn connect(&self, conn: Connection) { + if let Err(err) = self.sender.try_send(Some(conn)) { + error!("{}, {}", SERVER_STATE_NEW_CONNECTION_FAILURE, err); + } + } +} + +impl Drop for ConnectionState { + fn drop(&mut self) { + self.shared_state.stop_server_running(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_common::{backup_file, filename, restore_backup_file}; + use serial_test::serial; + use std::{ + str::FromStr, + sync::mpsc, + thread::sleep, + time::{Duration, SystemTime}, + }; + const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; + const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; + const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; + const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; + + #[test] + fn create_tcp() { + let host = String::from("0.0.0.0"); + let port = 55555; + let conn = Connection::tcp(host.clone(), port); + assert_eq!(conn.name(), format!("{}:{}", host, port)); + assert!(!conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::Off); + } + + #[test] + fn create_file() { + let filepath = String::from(TEST_FILEPATH); + let realtime_delay_on = RealtimeDelay::On; + let realtime_delay_off = RealtimeDelay::Off; + let close_when_done_false = false; + let close_when_done_true = true; + let conn = Connection::file(filepath.clone(), realtime_delay_on, close_when_done_true); + assert_eq!(conn.name(), String::from("piksi-relay-1min.sbp")); + assert!(conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::On); + let conn = Connection::file(filepath, realtime_delay_off, close_when_done_false); + assert!(!conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::Off); + } + + #[test] + fn create_serial() { + let device = String::from("/dev/ttyUSB0"); + let baudrate = 115200; + let flow = FlowControl::from_str(FLOW_CONTROL_NONE).unwrap(); + let conn = Connection::serial(device.clone(), baudrate, flow); + assert_eq!(conn.name(), format!("{} @{}", device, baudrate)); + assert!(!conn.close_when_done()); + assert_eq!(conn.realtime_delay(), RealtimeDelay::Off); + } + + fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { + thread::spawn(move || { + let mut iter_count = 0; + + loop { + if client_recv.recv().is_err() { + break; + } + + iter_count += 1; + } + assert!(iter_count > 0); + }) + } + + #[test] + #[serial] + fn connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let connection_state = ConnectionState::new(client_send, shared_state.clone()); + let filename = TEST_SHORT_FILEPATH.to_string(); + receive_thread(client_receive); + assert!(!shared_state.is_running()); + connection_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + sleep(Duration::from_millis( + DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + )); + assert!(shared_state.is_running()); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(!shared_state.is_running()); + restore_backup_file(bfilename); + } + + #[test] + #[serial] + fn pause_via_connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let connection_state = ConnectionState::new(client_send, shared_state.clone()); + let filename = TEST_SHORT_FILEPATH.to_string(); + receive_thread(client_receive); + assert!(!shared_state.is_running()); + connection_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + sleep(Duration::from_millis( + DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, + )); + assert!(shared_state.is_running()); + shared_state.set_paused(true); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(shared_state.is_running()); + shared_state.set_paused(false); + sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); + assert!(!shared_state.is_running()); + restore_backup_file(bfilename); + } + + #[test] + #[serial] + fn disconnect_via_connect_to_file_test() { + let bfilename = filename(); + backup_file(bfilename.clone()); + let shared_state = SharedState::new(); + let (client_send_, client_receive) = mpsc::channel::>(); + let client_send = ClientSender { + inner: client_send_, + }; + let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); + let filename = TEST_FILEPATH.to_string(); + let expected_duration = Duration::from_secs_f64(SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC) + + Duration::from_millis(100); + let handle = receive_thread(client_receive); + assert!(!shared_state.is_running()); + { + connection_state.connect_to_file( + filename, + RealtimeDelay::On, + /*close_when_done = */ true, + ); + } + + sleep(Duration::from_millis(5)); + assert!(shared_state.is_running()); + let now = SystemTime::now(); + sleep(Duration::from_millis(1)); + connection_state.disconnect(client_send.clone()); + sleep(Duration::from_millis(10)); + assert!(!shared_state.is_running()); + shared_state.stop_server_running(); + drop(client_send); + assert!(handle.join().is_ok()); + + match now.elapsed() { + Ok(elapsed) => { + assert!( + elapsed < expected_duration, + "Time elapsed for disconnect test {:?}, expecting {:?}ms", + elapsed, + expected_duration + ); + } + Err(e) => { + panic!("unknown error {}", e); + } + } + restore_backup_file(bfilename); + } + + // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for TCPStream. + // #[test] + // fn connect_to_host_test() { + // } + + // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for serial. + // #[test] + // fn connect_to_serial_test() { + // } +} diff --git a/console_backend/src/constants.rs b/console_backend/src/constants.rs index c5d562ee9..3a3c91672 100644 --- a/console_backend/src/constants.rs +++ b/console_backend/src/constants.rs @@ -239,3 +239,5 @@ pub(crate) const UNKNOWN_ERROR: &str = "Unk Error"; pub(crate) const UNKNOWN_ERROR_SHORT: &str = "unk"; pub(crate) const ODO_POSTFIX: &str = "+Odo"; pub(crate) const INS_POSTFIX: &str = "+INS"; + +pub(crate) const SERVER_STATE_CONNECTION_LOOP_TIMEOUT_SEC: f64 = 1.0; diff --git a/console_backend/src/errors.rs b/console_backend/src/errors.rs index 44535169e..8ec8484d4 100644 --- a/console_backend/src/errors.rs +++ b/console_backend/src/errors.rs @@ -10,3 +10,7 @@ pub(crate) const GET_MUT_OBJECT_FAILURE: &str = "error trying to get mut object" pub(crate) const UNABLE_TO_STOP_TIMER_THREAD_FAILURE: &str = "unable to kill running timer thread"; pub(crate) const UNABLE_TO_SEND_INS_UPDATE_FAILURE: &str = "unable to send an ins status update"; pub(crate) const THREAD_JOIN_FAILURE: &str = "thread join failure"; +pub(crate) const TCP_CONNECTION_PARSING_FAILURE: &str = + "unable to parse the provided string for ip string"; +pub(crate) const SERVER_STATE_NEW_CONNECTION_FAILURE: &str = "server state new connection failure"; +pub(crate) const SERVER_STATE_DISCONNECT_FAILURE: &str = "server state disconnect failure"; diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index 657222bb6..44bf34a52 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -7,6 +7,7 @@ pub mod console_backend_capnp { } pub mod broadcaster; pub mod common_constants; +pub mod connection; pub mod constants; pub mod date_conv; pub mod errors; @@ -28,3 +29,61 @@ pub mod status_bar; pub mod tracking_signals_tab; pub mod types; pub mod utils; + +#[cfg(test)] +pub mod test_common { + use crate::constants::*; + use directories::UserDirs; + use std::{fs, path::PathBuf}; + + pub mod data_directories { + #![allow(dead_code)] + pub const LINUX: &str = ".local/share/swift_navigation_console"; + pub const MACOS: &str = + "Library/Application Support/com.swift-nav.swift-nav.swift_navigation_console"; + pub const WINDOWS: &str = "AppData\\Local\\swift-nav\\swift_navigation_console\\data"; + } + + pub fn filename() -> PathBuf { + let user_dirs = UserDirs::new().unwrap(); + let home_dir = user_dirs.home_dir(); + #[cfg(target_os = "linux")] + { + home_dir + .join(data_directories::LINUX) + .join(CONNECTION_HISTORY_FILENAME) + } + + #[cfg(target_os = "macos")] + { + home_dir + .join(data_directories::MACOS) + .join(CONNECTION_HISTORY_FILENAME) + } + #[cfg(target_os = "windows")] + { + home_dir + .join(data_directories::WINDOWS) + .join(CONNECTION_HISTORY_FILENAME) + } + } + + pub fn backup_file(filename: PathBuf) { + if filename.exists() { + let mut backup_filename = filename.clone(); + backup_filename.set_extension("backup"); + fs::rename(filename, backup_filename).unwrap(); + } + } + + pub fn restore_backup_file(filename: PathBuf) { + let mut backup_filename = filename.clone(); + backup_filename.set_extension("backup"); + if filename.exists() { + fs::remove_file(filename.clone()).unwrap(); + } + if backup_filename.exists() { + fs::rename(backup_filename, filename).unwrap(); + } + } +} diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index bc8276b0c..559717445 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -1,30 +1,46 @@ use log::{debug, error}; -use sbp::sbp_tools::SBPTools; +use sbp::sbp_tools::{ControlFlow, SBPTools}; use sbp::{ messages::{SBPMessage, SBP}, serialize::SbpSerialize, }; -use std::{thread::sleep, time::Duration}; +use std::{io::ErrorKind, thread::sleep, time::Duration}; +use crate::connection::Connection; use crate::constants::PAUSE_LOOP_SLEEP_DURATION_MS; use crate::log_panel::handle_log_msg; use crate::main_tab::*; use crate::types::*; +use crate::utils::{close_frontend, refresh_navbar}; pub fn process_messages( conn: Connection, shared_state: SharedState, - client_send: S, - realtime_delay: RealtimeDelay, -) where + mut client_send: S, +) -> Result<()> +where S: MessageSender, { - let (rdr, _) = conn.into_io(); - let mut main = MainTab::new(shared_state.clone(), client_send); + shared_state.set_running(true, client_send.clone()); + let realtime_delay = conn.realtime_delay(); + let (rdr, _) = conn.try_connect(Some(shared_state.clone()))?; + shared_state.set_current_connection(conn.name()); + refresh_navbar(&mut client_send.clone(), shared_state.clone()); + let mut main = MainTab::new(shared_state.clone(), client_send.clone()); let messages = sbp::iter_messages(rdr) - .log_errors(log::Level::Debug) + .handle_errors(|e| { + debug!("{}", e); + match e { + sbp::Error::IoError(err) => { + if (*err).kind() == ErrorKind::TimedOut { + shared_state.set_running(false, client_send.clone()); + } + ControlFlow::Break + } + _ => ControlFlow::Continue, + } + }) .with_rover_time(); - for (message, gps_time) in messages { if !shared_state.is_running() { if let Err(e) = main.end_csv_logging() { @@ -183,4 +199,9 @@ pub fn process_messages( } log::logger().flush(); } + if conn.close_when_done() { + shared_state.set_running(false, client_send.clone()); + close_frontend(&mut client_send); + } + Ok(()) } diff --git a/console_backend/src/server.rs b/console_backend/src/server.rs index b4bec3525..e7b16e2c4 100644 --- a/console_backend/src/server.rs +++ b/console_backend/src/server.rs @@ -5,8 +5,7 @@ use pyo3::prelude::*; use pyo3::types::PyBytes; use async_logger_log::Logger; -use log::warn; - +use log::error; use std::{ io::{BufReader, Cursor}, path::PathBuf, @@ -16,12 +15,13 @@ use std::{ }; use crate::cli_options::*; +use crate::connection::ConnectionState; use crate::console_backend_capnp as m; use crate::constants::LOG_WRITER_BUFFER_MESSAGE_COUNT; use crate::errors::*; use crate::log_panel::{splitable_log_formatter, LogLevel, LogPanelWriter}; use crate::output::{CsvLogging, SbpLogging}; -use crate::types::{ClientSender, FlowControl, ServerState, SharedState}; +use crate::types::{ClientSender, FlowControl, RealtimeDelay, SharedState}; use crate::utils::{refresh_loggingbar, refresh_navbar}; /// The backend server @@ -61,34 +61,18 @@ impl ServerEndpoint { /// /// # Parameters /// - `opt`: CLI Options to start specific connection type. -/// - `server_state`: The Server state to start a specific connection. +/// - `connection_state`: The Server state to start a specific connection. /// - `client_send`: Client Sender channel for communication from backend to frontend. /// - `shared_state`: The shared state for validating another connection is not already running. -fn handle_cli( - opt: CliOptions, - server_state: ServerState, - client_send: ClientSender, - shared_state: SharedState, -) { +fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state: SharedState) { if let Some(opt_input) = opt.input { match opt_input { Input::Tcp { host, port } => { - if let Err(e) = - server_state.connect_to_host(client_send, shared_state.clone(), host, port) - { - warn!("failed to connect over tcp: {}", e); - } + connection_state.connect_to_host(host, port); } Input::File { file_in } => { let filename = file_in.display().to_string(); - if let Err(e) = server_state.connect_to_file( - client_send, - shared_state.clone(), - filename, - opt.exit_after, - ) { - warn!("failed to connect to file: {}", e); - } + connection_state.connect_to_file(filename, RealtimeDelay::On, opt.exit_after); } Input::Serial { serialport, @@ -96,15 +80,7 @@ fn handle_cli( flow_control, } => { let serialport = serialport.display().to_string(); - if let Err(e) = server_state.connect_to_serial( - client_send, - shared_state.clone(), - serialport, - baudrate, - flow_control, - ) { - warn!("failed to connect over serial: {}", e); - } + connection_state.connect_to_serial(serialport, baudrate, flow_control); } } } @@ -165,7 +141,7 @@ impl Server { inner: client_send_, }; let shared_state = SharedState::new(); - let server_state = ServerState::new(); + let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone()); let logger = Logger::builder() .buf_size(LOG_WRITER_BUFFER_MESSAGE_COUNT) @@ -177,12 +153,7 @@ impl Server { log::set_boxed_logger(Box::new(logger)).expect("Failed to set logger"); // Handle CLI Opts. - handle_cli( - opt, - server_state.clone(), - client_send.clone(), - shared_state.clone(), - ); + handle_cli(opt, &connection_state, shared_state.clone()); refresh_navbar(&mut client_send.clone(), shared_state.clone()); refresh_loggingbar(&mut client_send.clone(), shared_state.clone()); thread::spawn(move || loop { @@ -200,7 +171,7 @@ impl Server { let message = match message.which() { Ok(msg) => msg, Err(e) => { - eprintln!("error reading message: {}", e); + error!("error reading message: {}", e); continue; } }; @@ -215,11 +186,10 @@ impl Server { let request = match request.which() { Ok(msg) => msg, Err(e) => { - eprintln!("error reading message: {}", e); + error!("error reading message: {}", e); continue; } }; - let server_state_clone = server_state.clone(); let shared_state_clone = shared_state.clone(); let client_send_clone = client_send.clone(); match request { @@ -227,23 +197,18 @@ impl Server { refresh_navbar(&mut client_send_clone.clone(), shared_state_clone); } m::message::DisconnectRequest(Ok(_)) => { - shared_state_clone.set_running(false, client_send_clone.clone()); - server_state_clone.connection_join(); - println!("Disconnected successfully."); + connection_state.disconnect(client_send_clone.clone()); } m::message::FileRequest(Ok(req)) => { let filename = req .get_filename() .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let filename = filename.to_string(); - if let Err(e) = server_state_clone.connect_to_file( - client_send_clone, - shared_state_clone, + connection_state.connect_to_file( filename, - /*close_when_done = */ false, - ) { - warn!("Failed to connect to file: {}", e); - } + RealtimeDelay::On, + /*close_when_done*/ false, + ); } m::message::PauseRequest(Ok(_)) => { if shared_state_clone.is_paused() { @@ -256,14 +221,7 @@ impl Server { let host = req.get_host().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let port = req.get_port(); - if let Err(e) = server_state_clone.connect_to_host( - client_send_clone, - shared_state_clone, - host.to_string(), - port, - ) { - warn!("Failed to connect over tcp: {}", e); - } + connection_state.connect_to_host(host.to_string(), port); } m::message::SerialRequest(Ok(req)) => { let device = @@ -272,15 +230,7 @@ impl Server { let baudrate = req.get_baudrate(); let flow = req.get_flow_control().unwrap(); let flow = FlowControl::from_str(flow).unwrap(); - if let Err(e) = server_state_clone.connect_to_serial( - client_send_clone, - shared_state_clone, - device, - baudrate, - flow, - ) { - warn!("Failed to connect over serial: {}", e); - } + connection_state.connect_to_serial(device, baudrate, flow); } _ => println!("err"), } @@ -375,7 +325,7 @@ impl Server { (*shared_data).baseline_tab.reset = cv_in.get_reset_filters(); } _ => { - eprintln!("unknown message from front-end"); + error!("unknown message from front-end"); } } } else { diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index 22ec7bd4d..e698385dc 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -4,14 +4,14 @@ use crate::errors::*; use crate::log_panel::LogLevel; use crate::output::CsvLogging; use crate::piksi_tools_constants::*; -use crate::process_messages::process_messages; -use crate::utils::{close_frontend, mm_to_m, ms_to_sec, refresh_navbar, set_connected_frontend}; + +use crate::utils::{mm_to_m, ms_to_sec, set_connected_frontend}; use anyhow::{Context, Result as AHResult}; use chrono::{DateTime, Utc}; use directories::{ProjectDirs, UserDirs}; use indexmap::set::IndexSet; use lazy_static::lazy_static; -use log::{error, info}; +use log::error; use ordered_float::OrderedFloat; use sbp::codec::dencode::{FramedWrite, IterSinkExt}; use sbp::codec::sbp::SbpEncoder; @@ -36,19 +36,15 @@ use std::{ fmt::Debug, fs, hash::Hash, - io, - net::TcpStream, ops::Deref, path::PathBuf, str::FromStr, sync::{ + self, atomic::{AtomicBool, Ordering::*}, - mpsc::Sender, Arc, Mutex, }, - thread, - thread::JoinHandle, - time::{Duration, Instant}, + time::Instant, }; pub type Error = std::boxed::Box; @@ -121,7 +117,7 @@ pub trait MessageSender: Debug + Clone + Send { #[derive(Debug, Clone)] pub struct ClientSender { - pub inner: Sender>, + pub inner: sync::mpsc::Sender>, } impl MessageSender for ClientSender { fn send_data(&mut self, msg_bytes: Vec) { @@ -168,147 +164,6 @@ impl Clone for ArcBool { } } -#[derive(Debug)] -pub struct ServerState(Arc>); - -impl Deref for ServerState { - type Target = Mutex; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Default for ServerState { - fn default() -> Self { - Self::new() - } -} - -impl Clone for ServerState { - fn clone(&self) -> Self { - ServerState { - 0: Arc::clone(&self.0), - } - } -} - -impl ServerState { - pub fn new() -> ServerState { - ServerState(Arc::new(Mutex::new(ServerStateInner::default()))) - } - pub fn new_connection(&self, new_thread: JoinHandle<()>) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - (*shared_data).handler = Some(new_thread); - } - pub fn connection_join(&self) { - let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); - let handler = &mut (*shared_data).handler; - if let Some(handle) = handler.take() { - handle.join().unwrap(); - } - } - - /// Helper function for attempting to open a file and process SBP messages from it. - /// - /// # Parameters - /// - `client_send`: Client Sender channel for communication from backend to frontend. - /// - `shared_state`: The shared state for validating another connection is not already running. - /// - `filename`: The path to the filename to be read for SBP messages. - pub fn connect_to_file( - &self, - client_send: ClientSender, - shared_state: SharedState, - filename: String, - close_when_done: bool, - ) -> Result<()> { - let conn = Connection::file(filename.clone())?; - info!("Opened file successfully!"); - shared_state.update_file_history(filename); - self.connect( - conn, - client_send, - shared_state, - RealtimeDelay::On, - close_when_done, - ); - Ok(()) - } - - /// Helper function for attempting to open a tcp connection and process SBP messages from it. - /// - /// # Parameters - /// - `client_send`: Client Sender channel for communication from backend to frontend. - /// - `shared_state`: The shared state for validating another connection is not already running. - /// - `host`: The host portion of the TCP stream to open. - /// - `port`: The port to be used to open a TCP stream. - pub fn connect_to_host( - &self, - client_send: ClientSender, - shared_state: SharedState, - host: String, - port: u16, - ) -> Result<()> { - let conn = Connection::tcp(host.clone(), port)?; - info!("Connected to tcp stream!"); - shared_state.update_tcp_history(host, port); - self.connect(conn, client_send, shared_state, RealtimeDelay::Off, false); - Ok(()) - } - - /// Helper function for attempting to open a serial port and process SBP messages from it. - /// - /// # Parameters - /// - `client_send`: Client Sender channel for communication from backend to frontend. - /// - `shared_state`: The shared state for validating another connection is not already running. - /// - `device`: The string path corresponding to the serial device to connect with. - /// - `baudrate`: The baudrate to use when communicating with the serial device. - /// - `flow`: The flow control mode to use when communicating with the serial device. - pub fn connect_to_serial( - &self, - client_send: ClientSender, - shared_state: SharedState, - device: String, - baudrate: u32, - flow: FlowControl, - ) -> Result<()> { - let conn = Connection::serial(device, baudrate, flow)?; - info!("Connected to serialport!"); - self.connect(conn, client_send, shared_state, RealtimeDelay::Off, false); - Ok(()) - } - - fn connect( - &self, - conn: Connection, - mut client_send: ClientSender, - shared_state: SharedState, - delay: RealtimeDelay, - close_when_done: bool, - ) { - shared_state.set_current_connection(conn.name.clone()); - shared_state.set_running(true, client_send.clone()); - self.connection_join(); - self.new_connection(thread::spawn(move || { - refresh_navbar(&mut client_send, shared_state.clone()); - process_messages(conn, shared_state.clone(), client_send.clone(), delay); - if close_when_done { - close_frontend(&mut client_send); - } - shared_state.set_running(false, client_send); - })); - } -} - -#[derive(Debug, Default)] -pub struct ServerStateInner { - pub handler: Option>, -} -impl ServerStateInner { - pub fn new() -> ServerStateInner { - ServerStateInner { handler: None } - } -} - #[derive(Debug)] pub struct SharedState(Arc>); @@ -320,7 +175,10 @@ impl SharedState { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).running } - pub fn set_running(&self, set_to: bool, mut client_send: ClientSender) { + pub fn set_running(&self, set_to: bool, mut client_send: S) + where + S: MessageSender, + { if set_to { set_connected_frontend(cc::ApplicationStates::CONNECTED, &mut client_send); } else { @@ -330,6 +188,15 @@ impl SharedState { let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).running = set_to; } + pub fn is_server_running(&self) -> bool { + let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).server_running + } + pub fn stop_server_running(&self) { + let mut shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); + (*shared_data).running = false; + (*shared_data).server_running = false; + } pub fn is_paused(&self) -> bool { let shared_data = self.lock().expect(SHARED_STATE_LOCK_MUTEX_FAILURE); (*shared_data).paused @@ -450,6 +317,7 @@ pub struct SharedStateInner { pub(crate) paused: bool, pub(crate) connection_history: ConnectionHistory, pub(crate) running: bool, + pub(crate) server_running: bool, pub(crate) solution_tab: SolutionTabState, pub(crate) baseline_tab: BaselineTabState, } @@ -465,6 +333,7 @@ impl SharedStateInner { paused: false, connection_history, running: false, + server_running: true, solution_tab: SolutionTabState::new(), baseline_tab: BaselineTabState::new(), } @@ -623,6 +492,7 @@ impl SolutionVelocityTabState { } // Main Tab Types. +#[derive(Clone, Copy, Debug, PartialEq)] pub enum RealtimeDelay { On, Off, @@ -815,7 +685,7 @@ impl ConnectionHistory { } // Enum wrapping around various Vel NED Message types. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FlowControl(SPFlowControl); impl FromStr for FlowControl { @@ -1841,73 +1711,11 @@ impl std::str::FromStr for VelocityUnits { } } -pub struct Connection { - pub name: String, - pub rdr: Box, - pub wtr: Box, -} - -impl Connection { - pub fn tcp(host: String, port: u16) -> Result { - let name = format!("{}:{}", host, port); - let rdr = TcpStream::connect(&name)?; - let wtr = rdr.try_clone()?; - Ok(Self { - name, - rdr: Box::new(rdr), - wtr: Box::new(wtr), - }) - } - - pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Result { - let rdr = serialport::new(&device, baudrate) - .flow_control(*flow) - .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) - .open()?; - let wtr = rdr.try_clone()?; - Ok(Self { - name: device, - rdr: Box::new(rdr), - wtr: Box::new(wtr), - }) - } - - pub fn file(filename: String) -> Result { - let rdr = fs::File::open(&filename)?; - let wtr = io::sink(); - Ok(Self { - name: filename, - rdr: Box::new(rdr), - wtr: Box::new(wtr), - }) - } - - pub fn into_io(self) -> (Box, Box) { - (self.rdr, self.wtr) - } -} - #[cfg(test)] mod tests { use super::*; + use crate::test_common::{backup_file, data_directories, filename, restore_backup_file}; use serial_test::serial; - use std::{ - sync::mpsc, - thread::sleep, - time::{Duration, SystemTime}, - }; - const TEST_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; - const TEST_SHORT_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; - const SBP_FILE_SHORT_DURATION_SEC: f64 = 27.1; - const DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS: u64 = 150; - - pub mod data_directories { - #![allow(dead_code)] - pub const LINUX: &str = ".local/share/swift_navigation_console"; - pub const MACOS: &str = - "Library/Application Support/com.swift-nav.swift-nav.swift_navigation_console"; - pub const WINDOWS: &str = "AppData\\Local\\swift-nav\\swift_navigation_console\\data"; - } #[test] fn create_data_dir_test() { @@ -1929,49 +1737,6 @@ mod tests { } } - fn filename() -> PathBuf { - let user_dirs = UserDirs::new().unwrap(); - let home_dir = user_dirs.home_dir(); - #[cfg(target_os = "linux")] - { - home_dir - .join(data_directories::LINUX) - .join(CONNECTION_HISTORY_FILENAME) - } - - #[cfg(target_os = "macos")] - { - home_dir - .join(data_directories::MACOS) - .join(CONNECTION_HISTORY_FILENAME) - } - #[cfg(target_os = "windows")] - { - home_dir - .join(data_directories::WINDOWS) - .join(CONNECTION_HISTORY_FILENAME) - } - } - - fn backup_file(filename: PathBuf) { - if filename.exists() { - let mut backup_filename = filename.clone(); - backup_filename.set_extension("backup"); - fs::rename(filename, backup_filename).unwrap(); - } - } - - fn restore_backup_file(filename: PathBuf) { - let mut backup_filename = filename.clone(); - backup_filename.set_extension("backup"); - if filename.exists() { - fs::remove_file(filename.clone()).unwrap(); - } - if backup_filename.exists() { - fs::rename(backup_filename, filename).unwrap(); - } - } - #[test] #[serial] fn connection_history_save_test() { @@ -2036,145 +1801,4 @@ mod tests { assert_eq!(conn_history.files().len(), MAX_CONNECTION_HISTORY as usize); restore_backup_file(bfilename); } - - fn receive_thread(client_recv: mpsc::Receiver>) -> JoinHandle<()> { - thread::spawn(move || { - let mut iter_count = 0; - - loop { - if client_recv.recv().is_err() { - break; - } - - iter_count += 1; - } - assert!(iter_count > 0); - }) - } - - #[test] - #[serial] - fn connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - server_state - .connect_to_file( - client_send, - shared_state.clone(), - filename, - /*close_when_done = */ true, - ) - .unwrap(); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - } - - #[test] - #[serial] - fn pause_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(); - let filename = TEST_SHORT_FILEPATH.to_string(); - receive_thread(client_receive); - assert!(!shared_state.is_running()); - server_state - .connect_to_file( - client_send, - shared_state.clone(), - filename, - /*close_when_done = */ true, - ) - .unwrap(); - sleep(Duration::from_millis( - DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, - )); - assert!(shared_state.is_running()); - shared_state.set_paused(true); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(shared_state.is_running()); - shared_state.set_paused(false); - sleep(Duration::from_secs_f64(SBP_FILE_SHORT_DURATION_SEC)); - assert!(!shared_state.is_running()); - restore_backup_file(bfilename); - } - - #[test] - #[serial] - fn disconnect_via_connect_to_file_test() { - let bfilename = filename(); - backup_file(bfilename.clone()); - let shared_state = SharedState::new(); - let (client_send_, client_receive) = mpsc::channel::>(); - let client_send = ClientSender { - inner: client_send_, - }; - let server_state = ServerState::new(); - let filename = TEST_FILEPATH.to_string(); - let expected_duration = Duration::from_millis(100); - let handle = receive_thread(client_receive); - assert!(!shared_state.is_running()); - { - server_state - .connect_to_file( - client_send.clone(), - shared_state.clone(), - filename, - /*close_when_done = */ true, - ) - .unwrap(); - } - - sleep(Duration::from_millis(5)); - assert!(shared_state.is_running()); - let now = SystemTime::now(); - sleep(Duration::from_millis(1)); - shared_state.set_running(false, client_send); - sleep(Duration::from_millis(10)); - assert!(handle.join().is_ok()); - - match now.elapsed() { - Ok(elapsed) => { - assert!( - elapsed < expected_duration, - "Time elapsed for disconnect test {:?}, expecting {:?}ms", - elapsed, - expected_duration - ); - } - Err(e) => { - panic!("unknown error {}", e); - } - } - restore_backup_file(bfilename); - } - - // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for TCPStream. - // #[test] - // fn connect_to_host_test() { - // } - - // TODO(johnmichael.burke@) [CPP-111] Need to implement unittest for serial. - // #[test] - // fn connect_to_serial_test() { - // } } diff --git a/console_backend/tests/mem_benches.rs b/console_backend/tests/mem_benches.rs index 19c7ab638..44ecf268c 100644 --- a/console_backend/tests/mem_benches.rs +++ b/console_backend/tests/mem_benches.rs @@ -12,8 +12,9 @@ mod mem_bench_impl { use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use console_backend::{ + connection::Connection, process_messages, - types::{ClientSender, Connection, RealtimeDelay, SharedState}, + types::{ClientSender, RealtimeDelay, SharedState}, }; const BENCH_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; @@ -98,8 +99,12 @@ mod mem_bench_impl { }; let shared_state = SharedState::new(); shared_state.set_running(true, client_send.clone()); - let conn = Connection::file(BENCH_FILEPATH.into()).unwrap(); - process_messages::process_messages(conn, shared_state, client_send, RealtimeDelay::On); + let conn = Connection::file( + BENCH_FILEPATH.into(), + RealtimeDelay::On, + /*close_when_done=*/ true, + ); + process_messages::process_messages(conn, shared_state, client_send).unwrap(); } recv_thread.join().expect("join should succeed"); mem_read_thread.join().expect("join should succeed");