diff --git a/Cargo.lock b/Cargo.lock index 983bfee7f..7b82ac86d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -357,12 +357,14 @@ dependencies = [ "ordered-float", "paste 1.0.5", "pyo3", + "rand", "sbp", "serde", "serde_json", "serde_yaml", "serial_test", "serialport", + "slotmap", "strum", "strum_macros", "sysinfo", @@ -1393,7 +1395,7 @@ dependencies = [ [[package]] name = "sbp" version = "3.4.8-alpha" -source = "git+https://github.com/swift-nav/libsbp.git?rev=a5c221684439f2124306a2daa44126aac5457c80#a5c221684439f2124306a2daa44126aac5457c80" +source = "git+https://github.com/swift-nav/libsbp.git?rev=ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f#ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f" dependencies = [ "base64", "byteorder", @@ -1533,6 +1535,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42a568c8f2cd051a4d283bd6eb0343ac214c1b0f1ac19f93e1175b2dee38c73d" +[[package]] +name = "slotmap" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585cd5dffe4e9e06f6dfdf66708b70aca3f781bed561f4f667b2d9c0d4559e36" +dependencies = [ + "version_check", +] + [[package]] name = "smallvec" version = "1.6.1" diff --git a/console_backend/Cargo.toml b/console_backend/Cargo.toml index 22f5a961c..9c3e55b18 100644 --- a/console_backend/Cargo.toml +++ b/console_backend/Cargo.toml @@ -16,8 +16,8 @@ capnp = "0.14" chrono = { version = "0.4", features = ["serde"] } csv = "1" paste = "1" -pyo3 = { version = "0.13", features = ["extension-module"] } -sbp = { git = "https://github.com/swift-nav/libsbp.git", rev = "a5c221684439f2124306a2daa44126aac5457c80", features = ["json", "swiftnav-rs"] } +pyo3 = { version = "0.13", features = ["extension-module"], optional = true } +sbp = { git = "https://github.com/swift-nav/libsbp.git", rev = "ce5e3d14ae4284c53982bc3fd79a2fa15976ff9f", features = ["json", "swiftnav-rs"] } serde = { version = "1.0.123", features = ["derive"] } tempfile = "3.2.0" ordered-float = "2.0" @@ -33,6 +33,8 @@ clap = "3.0.0-beta.2" indexmap = { version = "1.6.2", features = ["serde"] } serde_json = { version = "1" } crossbeam = "0.8.1" +rand = "0.8.3" +slotmap = "1.0.3" [target.'cfg(any(target_os = "macos", target_os = "windows"))'.dependencies] serialport = "4.0.1" @@ -53,7 +55,12 @@ name = "console_backend" crate-type = ["cdylib", "lib"] bench = false +[[bin]] +name = "fileio" +path = "src/bin/fileio.rs" +bench = false + [features] -default = [] +default = ["pyo3"] benches = [] tests = [] diff --git a/console_backend/benches/cpu_benches.rs b/console_backend/benches/cpu_benches.rs index 39bc24dd1..5780a1d6a 100644 --- a/console_backend/benches/cpu_benches.rs +++ b/console_backend/benches/cpu_benches.rs @@ -3,7 +3,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use glob::glob; use sbp::sbp_tools::SBPTools; use std::{ - fs, + fs, io, path::Path, sync::{mpsc, Arc, Mutex}, thread, time, @@ -12,7 +12,7 @@ use std::{ extern crate console_backend; use console_backend::{ process_messages, - types::{ClientSender, RealtimeDelay, SharedState}, + types::{ClientSender, Connection, RealtimeDelay, SharedState}, }; const BENCH_FILEPATH: &str = "./tests/data/piksi-relay.sbp"; @@ -61,15 +61,8 @@ fn run_process_messages(file_in_name: &str, failure: bool) { inner: client_send_, }; shared_state.set_running(true, client_send.clone()); - match fs::File::open(file_in_name) { - Ok(fileopen) => process_messages::process_messages( - fileopen, - shared_state, - client_send, - RealtimeDelay::Off, - ), - Err(e) => panic!("unable to read file, {}.", e), - } + let conn = Connection::file(file_in_name.into()).unwrap(); + process_messages::process_messages(conn, shared_state, client_send, RealtimeDelay::Off); } recv_thread.join().expect("join should succeed"); } diff --git a/console_backend/src/bin/fileio.rs b/console_backend/src/bin/fileio.rs new file mode 100644 index 000000000..496898179 --- /dev/null +++ b/console_backend/src/bin/fileio.rs @@ -0,0 +1,133 @@ +use std::{ + fs, + io::{self, Write}, +}; + +use clap::Clap; +use crossbeam::{channel, scope}; +use sbp::sbp_tools::SBPTools; + +use console_backend::{ + broadcaster::Broadcaster, + cli_options::Input, + fileio::Fileio, + types::{MsgSender, Result}, +}; + +#[derive(Clap, Debug)] +#[clap(about = "Fileio operations.")] +pub enum Opts { + /// Write a file from local source to remote destination dest. + Write { + source: String, + dest: String, + #[clap(subcommand)] + input: Input, + }, + + /// Read a file from remote source to local dest. If no dest is provided, file is read to stdout. + Read { + source: String, + dest: Option, + #[clap(subcommand)] + input: Input, + }, + + /// List a directory. + List { + path: String, + #[clap(subcommand)] + input: Input, + }, + + /// Delete a file. + Delete { + path: String, + #[clap(subcommand)] + input: Input, + }, +} + +fn main() -> Result<()> { + let bc = Broadcaster::new(); + + let (done_tx, done_rx) = channel::bounded(0); + + let bc_source = bc.clone(); + let run = move |rdr| { + let messages = sbp::iter_messages(rdr).log_errors(log::Level::Debug); + for msg in messages { + bc_source.send(&msg); + if done_rx.try_recv().is_ok() { + break; + } + } + }; + + match Opts::parse() { + Opts::Write { + source, + dest, + input, + } => { + let (rdr, wtr) = input.into_conn()?.into_io(); + let sender = MsgSender::new(wtr); + scope(|s| { + s.spawn(|_| run(rdr)); + let mut fileio = Fileio::new(bc, sender); + let data = fs::File::open(source)?; + fileio.overwrite(dest, data)?; + eprintln!("file written successfully."); + done_tx.send(true).unwrap(); + Result::Ok(()) + }) + .unwrap() + } + Opts::Read { + source, + dest, + input, + } => { + let (rdr, wtr) = input.into_conn()?.into_io(); + let sender = MsgSender::new(wtr); + scope(|s| { + s.spawn(|_| run(rdr)); + let mut fileio = Fileio::new(bc, sender); + let dest: Box = match dest { + Some(path) => Box::new(fs::File::create(path)?), + None => Box::new(io::stdout()), + }; + fileio.read(source, dest)?; + done_tx.send(true).unwrap(); + Result::Ok(()) + }) + .unwrap() + } + Opts::List { path, input } => { + let (rdr, wtr) = input.into_conn()?.into_io(); + let sender = MsgSender::new(wtr); + scope(|s| { + s.spawn(|_| run(rdr)); + let mut fileio = Fileio::new(bc, sender); + let files = fileio.readdir(path)?; + eprintln!("{:#?}", files); + done_tx.send(true).unwrap(); + Result::Ok(()) + }) + .unwrap() + } + Opts::Delete { path, input } => { + let (rdr, wtr) = input.into_conn()?.into_io(); + let sender = MsgSender::new(wtr); + scope(|s| { + s.spawn(|_| run(rdr)); + let fileio = Fileio::new(bc, sender); + fileio.remove(path)?; + eprintln!("file deleted."); + done_tx.send(true).unwrap(); + Result::Ok(()) + }) + .unwrap() + } + } +} diff --git a/console_backend/src/broadcaster.rs b/console_backend/src/broadcaster.rs new file mode 100644 index 000000000..b252863c6 --- /dev/null +++ b/console_backend/src/broadcaster.rs @@ -0,0 +1,333 @@ +use std::{ + convert::TryInto, + marker::PhantomData, + sync::{Arc, Mutex}, + time::Duration, +}; + +use crossbeam::channel; +use sbp::messages::{ConcreteMessage, SBPMessage, SBP}; +use slotmap::HopSlotMap; + +pub struct Broadcaster { + channels: Arc>>, +} + +impl Broadcaster { + const CHANNELS_LOCK_FAILURE: &'static str = "failed to aquire lock on channels"; + + pub fn new() -> Self { + Self { + channels: Arc::new(Mutex::new(HopSlotMap::with_key())), + } + } + + pub fn send(&self, message: &SBP) { + let msg_type = message.get_message_type(); + let mut channels = self.channels.lock().expect(Self::CHANNELS_LOCK_FAILURE); + channels.retain(|_, chan| { + if chan.msg_types.iter().any(|ty| ty == &msg_type) { + chan.inner.send(message.clone()).is_ok() + } else { + true + } + }); + } + + pub fn subscribe(&self) -> (Receiver, Key) + where + E: Event, + { + let (tx, rx) = channel::unbounded(); + let mut channels = self.channels.lock().expect(Self::CHANNELS_LOCK_FAILURE); + let key = channels.insert(Sender { + inner: tx, + msg_types: E::MESSAGE_TYPES, + }); + ( + Receiver { + inner: rx, + marker: PhantomData, + }, + Key { inner: key }, + ) + } + + pub fn unsubscribe(&self, key: Key) { + let mut channels = self.channels.lock().expect(Self::CHANNELS_LOCK_FAILURE); + channels.remove(key.inner); + } + + /// Wait once for a specific message. Returns an error if `dur` elapses. + pub fn wait(&self, dur: Duration) -> Result + where + E: Event, + { + let (rx, _) = self.subscribe::(); + rx.recv_timeout(dur) + } +} + +impl Clone for Broadcaster { + fn clone(&self) -> Self { + Self { + channels: Arc::clone(&self.channels), + } + } +} + +impl Default for Broadcaster { + fn default() -> Self { + Self::new() + } +} + +/// A wrapper around a channel sender that knows what message types its receivers expect. +struct Sender { + inner: channel::Sender, + msg_types: &'static [u16], +} + +/// A wrapper around a channel receiver that converts to the appropriate event. +pub struct Receiver { + inner: channel::Receiver, + marker: PhantomData, +} + +slotmap::new_key_type! { + struct KeyInner; +} + +/// Returned along with calls to `subscribe`. Used to unsubscribe to an event. +pub struct Key { + inner: KeyInner, +} + +impl Receiver +where + E: Event, +{ + pub fn recv(&self) -> Result { + let msg = self.inner.recv()?; + Ok(E::from_sbp(msg)) + } + + pub fn recv_timeout(&self, dur: Duration) -> Result { + let msg = self.inner.recv_timeout(dur)?; + Ok(E::from_sbp(msg)) + } + + pub fn try_recv(&self) -> Result { + let msg = self.inner.try_recv()?; + Ok(E::from_sbp(msg)) + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.inner.iter().map(E::from_sbp) + } + + pub fn try_iter(&self) -> impl Iterator + '_ { + self.inner.try_iter().map(E::from_sbp) + } + + // other channel methods as needed +} + +/// An event you can receive via a `Broadcaster`. It's implemented for all SBP +/// message types, but could also be implemented for composite messages like `Observations`. +pub trait Event { + /// The message types from which the event can be derived. + const MESSAGE_TYPES: &'static [u16]; + + /// Conversion from SBP. The message type of `msg` is guaranteed to be in + /// `Self::MESSAGE_TYPES`. + fn from_sbp(msg: SBP) -> Self; +} + +// All concrete message types can be used as events. +impl Event for T +where + T: ConcreteMessage, +{ + const MESSAGE_TYPES: &'static [u16] = &[T::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + msg.try_into().unwrap() + } +} + +#[cfg(test)] +mod tests { + use crossbeam::scope; + use sbp::messages::{ + self, + observation::{MsgObs, MsgObsDepA, ObservationHeader, ObservationHeaderDep}, + }; + + use super::*; + + #[test] + fn test_broadcaster() { + let b = Broadcaster::new(); + let (msg_obs, _) = b.subscribe::(); + + b.send(&make_msg_obs()); + b.send(&make_msg_obs_dep_a()); + + assert!(msg_obs.try_recv().is_ok()); + assert!(msg_obs.try_recv().is_err()); + } + + #[test] + fn test_multiple_subs() { + let b = Broadcaster::new(); + let (msg_obs1, _) = b.subscribe::(); + let (msg_obs2, _) = b.subscribe::(); + + b.send(&make_msg_obs()); + b.send(&make_msg_obs()); + + assert_eq!(msg_obs1.try_iter().count(), 2); + assert_eq!(msg_obs2.try_iter().count(), 2); + } + + #[test] + fn test_unsubscribe() { + let b = Broadcaster::new(); + + let (msg_obs1, key) = b.subscribe::(); + let (msg_obs2, _) = b.subscribe::(); + + b.send(&make_msg_obs()); + assert_eq!(msg_obs1.try_iter().count(), 1); + assert_eq!(msg_obs2.try_iter().count(), 1); + + b.unsubscribe(key); + + b.send(&make_msg_obs()); + assert_eq!(msg_obs1.try_iter().count(), 0); + assert_eq!(msg_obs2.try_iter().count(), 1); + } + + #[test] + fn test_iter_subscription() { + let b = Broadcaster::new(); + + let (msg_obs, key) = b.subscribe::(); + + scope(|s| { + s.spawn(|_| { + // initial non-blocking call should have no messages + assert_eq!(msg_obs.try_iter().count(), 0); + + // blocking call will get the two messages + assert_eq!(msg_obs.iter().count(), 2); + }); + + std::thread::sleep(Duration::from_secs(1)); + + b.send(&make_msg_obs()); + b.send(&make_msg_obs_dep_a()); + b.send(&make_msg_obs()); + + // msg_obs.iter() goes forever if you don't drop the channel + b.unsubscribe(key); + }) + .unwrap(); + } + + #[test] + fn test_wait() { + let b = Broadcaster::new(); + + scope(|s| { + s.spawn(|_| { + std::thread::sleep(Duration::from_secs(1)); + b.send(&make_msg_obs()) + }); + + assert!(b.wait::(Duration::from_secs(2)).is_ok()); + }) + .unwrap() + } + + #[test] + fn test_wait_timeout() { + let b = Broadcaster::new(); + + scope(|s| { + s.spawn(|_| { + std::thread::sleep(Duration::from_secs(2)); + b.send(&make_msg_obs()) + }); + + assert!(b.wait::(Duration::from_secs(1)).is_err()); + }) + .unwrap() + } + + #[test] + fn test_custom_event() { + let b = Broadcaster::new(); + + let (obs_msg, _) = b.subscribe::(); + let (msg_obs, _) = b.subscribe::(); + + b.send(&make_msg_obs()); + b.send(&make_msg_obs_dep_a()); + + // ObsMsg should accept both MsgObs and MsgObsDepA + assert!(obs_msg.try_recv().is_ok()); + assert!(obs_msg.try_recv().is_ok()); + assert!(obs_msg.try_recv().is_err()); + + // just MsgObs + assert!(msg_obs.try_recv().is_ok()); + assert!(msg_obs.try_recv().is_err()); + } + + enum ObsMsg { + Obs(MsgObs), + DepA(MsgObsDepA), + } + + impl Event for ObsMsg { + const MESSAGE_TYPES: &'static [u16] = &[MsgObs::MESSAGE_TYPE, MsgObsDepA::MESSAGE_TYPE]; + + fn from_sbp(msg: SBP) -> Self { + match msg { + SBP::MsgObs(m) => ObsMsg::Obs(m), + SBP::MsgObsDepA(m) => ObsMsg::DepA(m), + _ => unreachable!("wrong event keys"), + } + } + } + + fn make_msg_obs() -> SBP { + MsgObs { + sender_id: Some(1), + header: ObservationHeader { + t: messages::gnss::GPSTime { + tow: 1, + ns_residual: 1, + wn: 1, + }, + n_obs: 1, + }, + obs: vec![], + } + .into() + } + + fn make_msg_obs_dep_a() -> SBP { + MsgObsDepA { + sender_id: Some(1), + header: ObservationHeaderDep { + t: messages::gnss::GPSTimeDep { tow: 1, wn: 1 }, + n_obs: 1, + }, + obs: vec![], + } + .into() + } +} diff --git a/console_backend/src/cli_options.rs b/console_backend/src/cli_options.rs index 9b4ca3b09..12819645c 100644 --- a/console_backend/src/cli_options.rs +++ b/console_backend/src/cli_options.rs @@ -6,9 +6,12 @@ use std::{ }; use strum::VariantNames; -use crate::common_constants::{SbpLogging, Tabs}; use crate::constants::{AVAILABLE_BAUDRATES, AVAILABLE_REFRESH_RATES}; use crate::types::FlowControl; +use crate::{ + common_constants::{SbpLogging, Tabs}, + types::Connection, +}; #[derive(Debug)] pub struct CliTabs(Tabs); @@ -147,6 +150,20 @@ pub enum Input { }, } +impl Input { + pub fn into_conn(self) -> crate::types::Result { + match self { + Input::Tcp { host, port } => Connection::tcp(host, port), + Input::Serial { + serialport, + baudrate, + flow_control, + } => Connection::serial(serialport.to_string_lossy().into(), baudrate, flow_control), + Input::File { file_in } => Connection::file(file_in.to_string_lossy().into()), + } + } +} + /// Validation for the refresh-rate cli option. /// /// # Parameters diff --git a/console_backend/src/fileio.rs b/console_backend/src/fileio.rs new file mode 100644 index 000000000..49c7f7aad --- /dev/null +++ b/console_backend/src/fileio.rs @@ -0,0 +1,510 @@ +use std::{ + collections::HashMap, + io::{BufRead, BufReader, Read, Write}, + time::{Duration, Instant}, +}; + +use crossbeam::{atomic::AtomicCell, channel, scope, select, utils::Backoff}; +use rand::Rng; +use sbp::messages::{ + file_io::{ + MsgFileioConfigReq, MsgFileioConfigResp, MsgFileioReadDirReq, MsgFileioReadDirResp, + MsgFileioReadReq, MsgFileioReadResp, MsgFileioRemove, MsgFileioWriteReq, + MsgFileioWriteResp, + }, + SBP, +}; + +use crate::{ + broadcaster::Broadcaster, + types::{MsgSender, Result}, +}; + +const MAX_RETRIES: usize = 20; + +const READDIR_TIMEOUT: Duration = Duration::from_secs(5); +const CONFIG_REQ_RETRY: Duration = Duration::from_millis(100); +const CONFIG_REQ_TIMEOUT: Duration = Duration::from_secs(10); +const CHECK_INTERVAL: Duration = Duration::from_millis(100); +const FILE_IO_TIMEOUT: Duration = Duration::from_secs(3); + +const MAX_PAYLOAD_SIZE: usize = 255; +const READ_CHUNK_SIZE: usize = MAX_PAYLOAD_SIZE - 4; +const SEQUENCE_LEN: usize = 4; +const OFFSET_LEN: usize = 4; +const NULL_SEP_LEN: usize = 1; +const WRITE_REQ_OVERHEAD_LEN: usize = SEQUENCE_LEN + OFFSET_LEN + NULL_SEP_LEN; + +pub struct Fileio { + broadcast: Broadcaster, + sender: MsgSender, + config: Option, +} + +impl Fileio +where + W: Write + Send, +{ + pub fn new(broadcast: Broadcaster, sender: MsgSender) -> Self { + Self { + broadcast, + sender, + config: None, + } + } + + pub fn read(&mut self, path: String, mut dest: impl Write) -> Result<()> { + let config = self.fetch_config(); + + let sender = self.sender.clone(); + let send_msg = move |sequence, offset| { + sender.send(SBP::from(MsgFileioReadReq { + sender_id: None, + filename: path.clone().into(), + chunk_size: READ_CHUNK_SIZE as u8, + sequence, + offset, + })) + }; + + let (stop_req_tx, stop_req_rx) = channel::bounded(0); + let (req_tx, req_rx) = channel::unbounded(); + + let (res_tx, res_rx) = channel::unbounded(); + + let open_requests = AtomicCell::new(0u32); + + let mut sequence = new_sequence(); + // sequence number of the request we need to write to `dest` next + let mut current_sequence = sequence; + // holds data while we wait for out of order requests + let mut data: HashMap> = HashMap::new(); + let mut pending: HashMap = HashMap::new(); + let mut last_sent = false; + + scope(|s| { + s.spawn(|_| { + let mut offset = 0; + let backoff = Backoff::new(); + + while stop_req_rx.try_recv().is_err() { + while open_requests.load() >= config.window_size { + backoff.snooze(); + } + send_msg(sequence, offset)?; + req_tx.send((sequence, ReadReq::new(offset))).unwrap(); + offset += READ_CHUNK_SIZE as u32; + sequence += 1; + open_requests.fetch_add(1); + } + + sbp::Result::Ok(()) + }); + + let (sub, key) = self.broadcast.subscribe::(); + s.spawn(move |_| { + for res in sub.iter() { + res_tx.send(res).unwrap(); + } + }); + + loop { + select! { + recv(req_rx) -> msg => { + let (sequence, request) = msg?; + pending.insert(sequence, request); + }, + recv(res_rx) -> msg => { + let msg = msg?; + let req = match pending.remove(&msg.sequence) { + Some(req) => req, + None => continue, + }; + let bytes_read = msg.contents.len(); + if msg.sequence == current_sequence { + dest.write_all(&msg.contents)?; + current_sequence += 1; + while let Some(d) = data.remove(¤t_sequence) { + dest.write_all(&d)?; + current_sequence += 1; + } + } else { + data.insert(req.offset, msg.contents); + } + open_requests.fetch_sub(1); + if !last_sent && bytes_read != READ_CHUNK_SIZE as usize { + last_sent = true; + stop_req_tx.send(true).unwrap(); + } + if last_sent && open_requests.load() == 0 { + break + } + }, + recv(channel::tick(CHECK_INTERVAL)) -> _ => { + for (seq, req) in pending.iter_mut() { + if req.expired() { + req.track_retry()?; + send_msg(*seq, req.offset)?; + } + } + } + } + } + + self.broadcast.unsubscribe(key); + + Ok(()) + }) + .unwrap() + } + + /// Deletes `filename` on the remote device (if it exists) and writes the contents of `data` to the file. + /// This operation is NOT atomic. If the write fails and `filename` existed, it is gone forever. + /// For more context see: https://github.com/swift-nav/console_pp/pull/72#discussion_r654751414 + pub fn overwrite(&mut self, filename: String, data: impl Read) -> Result<()> { + self.remove(filename.clone())?; + + let mut data = BufReader::new(data); + let mut state = WriteState::new(filename); + + loop { + let buf = data.fill_buf()?; + let bytes_read = buf.len(); + if bytes_read == 0 { + break; + } + state = self.write_slice(state, buf)?; + data.consume(bytes_read); + } + + Ok(()) + } + + fn write_slice(&mut self, mut state: WriteState, data: &[u8]) -> Result { + let config = self.fetch_config(); + + let (req_tx, req_rx) = channel::unbounded(); + let (res_tx, res_rx) = channel::unbounded(); + + let open_requests = AtomicCell::new(0u32); + + let sender = self.sender.clone(); + let send_msg = |state: &WriteState, req: &WriteReq| { + sender.send(SBP::from(MsgFileioWriteReq { + sender_id: None, + sequence: state.sequence, + offset: state.offset as u32, + filename: state.filename(), + data: data[req.offset..req.end_offset].to_vec(), + })) + }; + + let data_len = data.len(); + + scope(|s| { + s.spawn(|_| { + let backoff = Backoff::new(); + let mut slice_offset = 0; + + while slice_offset < data_len { + while open_requests.load() >= config.window_size { + backoff.snooze(); + } + let end_offset = std::cmp::min(slice_offset + state.chunk_size, data_len); + let chunk_len = std::cmp::min(state.chunk_size, data_len - slice_offset); + let req = WriteReq::new(slice_offset, end_offset, chunk_len < state.chunk_size); + send_msg(&state, &req)?; + req_tx.send((state.clone(), req)).unwrap(); + state.update(chunk_len); + slice_offset += chunk_len; + open_requests.fetch_add(1); + } + + sbp::Result::Ok(()) + }); + + let (sub, key) = self.broadcast.subscribe::(); + s.spawn(move |_| { + for res in sub.iter() { + res_tx.send(res).unwrap(); + } + }); + + let mut pending: HashMap = HashMap::new(); + let mut last_sent = false; + + loop { + select! { + recv(req_rx) -> msg => { + let (req_state, req) = msg?; + if req.is_last { + last_sent = true; + } + pending.insert(req_state.sequence, (req_state, req)); + }, + recv(res_rx) -> msg => { + let msg = msg?; + if pending.remove(&msg.sequence).is_none() { + continue + } + open_requests.fetch_sub(1); + if last_sent && open_requests.load() == 0 { + break; + } + }, + recv(channel::tick(CHECK_INTERVAL)) -> _ => { + for (req_state, req) in pending.values_mut() { + if req.expired() { + req.track_retry()?; + send_msg(req_state, req)?; + } + } + } + } + } + + self.broadcast.unsubscribe(key); + + Result::Ok(()) + }) + .unwrap()?; + + Ok(state) + } + + pub fn readdir(&mut self, path: String) -> Result> { + let mut seq = new_sequence(); + let mut files = vec![]; + + loop { + self.sender.send(SBP::from(MsgFileioReadDirReq { + sender_id: None, + sequence: seq, + offset: files.len() as u32, + dirname: path.clone().into(), + }))?; + + let reply = self + .broadcast + .wait::(READDIR_TIMEOUT)?; + + if reply.sequence != seq { + return Err(format!( + "MsgFileioReadDirResp didn't match request ({} vs {})", + reply.sequence, seq + ) + .into()); + } + + let mut contents = reply.contents; + + if contents.is_empty() { + return Ok(files); + } + if contents[contents.len() - 1] == b'\0' { + contents.remove(contents.len() - 1); + } + for f in contents.split(|b| b == &b'\0') { + files.push(String::from_utf8_lossy(f).into_owned()); + } + seq += 1; + } + } + + pub fn remove(&self, filename: String) -> Result<()> { + self.sender.send(SBP::from(MsgFileioRemove { + sender_id: None, + filename: filename.into(), + }))?; + Ok(()) + } + + fn fetch_config(&mut self) -> FileioConfig { + if let Some(ref config) = self.config { + return config.clone(); + } + + let sequence = new_sequence(); + let (tx, rx) = channel::bounded(0); + + let config = scope(|s| { + s.spawn(|_| { + while rx.try_recv().is_err() { + let _ = self.sender.send(SBP::from(MsgFileioConfigReq { + sender_id: None, + sequence, + })); + std::thread::sleep(CONFIG_REQ_RETRY); + } + }); + + let config = self + .broadcast + .wait::(CONFIG_REQ_TIMEOUT) + .map_or_else(|_| Default::default(), Into::into); + + tx.send(true).unwrap(); + + config + }) + .unwrap(); + + self.config = Some(config); + self.config.clone().unwrap() + } +} + +/// State that spans an entire call to `write` (i.e. potentially multiple `write_slice` calls) +#[derive(Debug, Clone)] +struct WriteState { + sequence: u32, + /// Offset into the file (not the current slice of data) + offset: usize, + filename: String, + chunk_size: usize, +} + +impl WriteState { + fn new(filename: String) -> Self { + let (chunk_size, filename) = if filename.ends_with('\x00') { + ( + MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename.len() - 1, + filename, + ) + } else { + ( + MAX_PAYLOAD_SIZE - WRITE_REQ_OVERHEAD_LEN - filename.len(), + filename + "\x00", + ) + }; + Self { + sequence: new_sequence(), + offset: 0, + filename, + chunk_size, + } + } + + fn filename(&self) -> sbp::SbpString { + self.filename.clone().into() + } + + fn update(&mut self, chunk_len: usize) { + self.offset += chunk_len; + self.sequence += 1; + } +} + +struct FileioRequest { + sent_at: Instant, + retries: usize, +} + +impl FileioRequest { + fn new() -> Self { + Self { + sent_at: Instant::now(), + retries: 0, + } + } + + fn expired(&self) -> bool { + self.sent_at.elapsed() >= FILE_IO_TIMEOUT + } + + fn track_retry(&mut self) -> Result<()> { + self.retries += 1; + self.sent_at = Instant::now(); + + if self.retries >= MAX_RETRIES { + Err("fileio send message timeout".into()) + } else { + Ok(()) + } + } +} + +impl Default for FileioRequest { + fn default() -> Self { + Self::new() + } +} + +struct ReadReq { + offset: u32, + req: FileioRequest, +} + +impl ReadReq { + fn new(offset: u32) -> Self { + Self { + offset, + req: FileioRequest::new(), + } + } + + fn expired(&self) -> bool { + self.req.expired() + } + + fn track_retry(&mut self) -> Result<()> { + self.req.track_retry() + } +} + +struct WriteReq { + /// Offset start into current slice of data + offset: usize, + /// Offset end into current slice of data + end_offset: usize, + /// Is this the last request for this chunk of data + is_last: bool, + req: FileioRequest, +} + +impl WriteReq { + fn new(offset: usize, end_offset: usize, is_last: bool) -> Self { + Self { + offset, + end_offset, + is_last, + req: FileioRequest::new(), + } + } + + fn expired(&self) -> bool { + self.req.expired() + } + + fn track_retry(&mut self) -> Result<()> { + self.req.track_retry() + } +} + +#[derive(Debug, Clone)] +struct FileioConfig { + window_size: u32, + batch_size: u32, +} + +impl From for FileioConfig { + fn from(msg: MsgFileioConfigResp) -> Self { + FileioConfig { + window_size: msg.window_size, + batch_size: msg.batch_size, + } + } +} + +impl Default for FileioConfig { + fn default() -> Self { + FileioConfig { + window_size: 100, + batch_size: 1, + } + } +} + +fn new_sequence() -> u32 { + rand::thread_rng().gen_range(0..0xfffffff) +} diff --git a/console_backend/src/lib.rs b/console_backend/src/lib.rs index 2106f39bb..7575870e2 100644 --- a/console_backend/src/lib.rs +++ b/console_backend/src/lib.rs @@ -4,10 +4,12 @@ pub mod cli_options; pub mod console_backend_capnp { include!(concat!(env!("OUT_DIR"), "/console_backend_capnp.rs")); } +pub mod broadcaster; pub mod common_constants; pub mod constants; pub mod date_conv; pub mod errors; +pub mod fileio; pub mod formatters; pub mod fusion_status_flags; pub mod log_panel; @@ -17,7 +19,7 @@ pub mod output; pub mod piksi_tools_constants; pub mod process_messages; #[cfg(not(test))] -#[cfg(all(not(feature = "benches"), not(feature = "tests")))] +#[cfg(all(not(feature = "benches"), not(feature = "tests"), feature = "pyo3"))] pub mod server; pub mod solution_tab; pub mod solution_velocity_tab; diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 2dbad8ffb..f7e6d35cb 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -11,17 +11,20 @@ use crate::log_panel::handle_log_msg; use crate::main_tab::*; use crate::types::*; -pub fn process_messages( - messages: T, +pub fn process_messages( + conn: Connection, shared_state: SharedState, client_send: S, realtime_delay: RealtimeDelay, -) { +) where + S: MessageSender, +{ + let (rdr, _) = conn.into_io(); let mut main = MainTab::new(shared_state.clone(), client_send); - - let messages = sbp::iter_messages(messages) + let messages = sbp::iter_messages(rdr) .log_errors(log::Level::Debug) .with_rover_time(); + for (message, gps_time) in messages { if !shared_state.is_running() { if let Err(e) = main.end_csv_logging() { diff --git a/console_backend/src/server.rs b/console_backend/src/server.rs index c42fbf409..1087e9ced 100644 --- a/console_backend/src/server.rs +++ b/console_backend/src/server.rs @@ -5,6 +5,7 @@ use pyo3::prelude::*; use pyo3::types::PyBytes; use async_logger_log::Logger; +use log::warn; use std::{ io::{BufReader, Cursor}, @@ -73,16 +74,22 @@ fn handle_cli( if let Some(opt_input) = opt.input { match opt_input { Input::Tcp { host, port } => { - server_state.connect_to_host(client_send, shared_state.clone(), host, port); + if let Err(e) = + server_state.connect_to_host(client_send, shared_state.clone(), host, port) + { + warn!("failed to connect over tcp: {}", e); + } } Input::File { file_in } => { let filename = file_in.display().to_string(); - server_state.connect_to_file( + if let Err(e) = server_state.connect_to_file( client_send, shared_state.clone(), filename, opt.exit_after, - ); + ) { + warn!("failed to connect to file: {}", e); + } } Input::Serial { serialport, @@ -90,13 +97,15 @@ fn handle_cli( flow_control, } => { let serialport = serialport.display().to_string(); - server_state.connect_to_serial( + if let Err(e) = server_state.connect_to_serial( client_send, shared_state.clone(), serialport, baudrate, flow_control, - ); + ) { + warn!("failed to connect over serial: {}", e); + } } } } @@ -222,12 +231,14 @@ impl Server { .get_filename() .expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let filename = filename.to_string(); - server_state_clone.connect_to_file( + if let Err(e) = server_state_clone.connect_to_file( client_send_clone, shared_state_clone, filename, /*close_when_done = */ false, - ); + ) { + warn!("Failed to connect to file: {}", e); + } } m::message::PauseRequest(Ok(_)) => { if shared_state_clone.is_paused() { @@ -240,12 +251,14 @@ impl Server { let host = req.get_host().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); let port = req.get_port(); - server_state_clone.connect_to_host( + if let Err(e) = server_state_clone.connect_to_host( client_send_clone, shared_state_clone, host.to_string(), port, - ); + ) { + warn!("Failed to connect over tcp: {}", e); + } } m::message::SerialRequest(Ok(req)) => { let device = @@ -254,13 +267,15 @@ impl Server { let baudrate = req.get_baudrate(); let flow = req.get_flow_control().unwrap(); let flow = FlowControl::from_str(flow).unwrap(); - server_state_clone.connect_to_serial( + if let Err(e) = server_state_clone.connect_to_serial( client_send_clone, shared_state_clone, device, baudrate, flow, - ); + ) { + warn!("Failed to connect over serial: {}", e); + } } _ => println!("err"), } diff --git a/console_backend/src/types.rs b/console_backend/src/types.rs index c41b0d23e..f8bfc708b 100644 --- a/console_backend/src/types.rs +++ b/console_backend/src/types.rs @@ -11,8 +11,11 @@ use chrono::{DateTime, Utc}; use directories::{ProjectDirs, UserDirs}; use indexmap::set::IndexSet; use lazy_static::lazy_static; -use log::{error, info, warn}; +use log::{error, info}; use ordered_float::OrderedFloat; +use sbp::codec::dencode::{FramedWrite, IterSinkExt}; +use sbp::codec::sbp::SbpEncoder; +use sbp::messages::SBPMessage; use sbp::messages::{ navigation::{ MsgBaselineNED, MsgBaselineNEDDepA, MsgDops, MsgDopsDepA, MsgPosLLH, MsgPosLLHDepA, @@ -22,6 +25,7 @@ use sbp::messages::{ MsgObs, MsgObsDepB, MsgObsDepC, MsgOsr, PackedObsContent, PackedObsContentDepB, PackedObsContentDepC, PackedOsrContent, }, + SBP, }; use serde::{Deserialize, Serialize}; use serialport::FlowControl as SPFlowControl; @@ -32,6 +36,7 @@ use std::{ fmt::Debug, fs, hash::Hash, + io, net::TcpStream, ops::Deref, path::PathBuf, @@ -50,6 +55,38 @@ pub type Error = std::boxed::Box; pub type Result = std::result::Result; pub type UtcDateTime = DateTime; +/// Sends SBP messages to the connected device +pub struct MsgSender { + inner: Arc>>, +} + +impl MsgSender { + /// 42 is the conventional sender ID intended for messages sent from the host to the device + const SENDER_ID: u16 = 42; + const LOCK_FAILURE: &'static str = "failed to aquire sender lock"; + + pub fn new(wtr: W) -> Self { + Self { + inner: Arc::new(Mutex::new(FramedWrite::new(wtr, SbpEncoder::new()))), + } + } + + pub fn send(&self, mut msg: SBP) -> sbp::Result<()> { + msg.set_sender_id(Self::SENDER_ID); + let mut framed = self.inner.lock().expect(Self::LOCK_FAILURE); + framed.send(msg)?; + Ok(()) + } +} + +impl Clone for MsgSender { + fn clone(&self) -> Self { + MsgSender { + inner: Arc::clone(&self.inner), + } + } +} + #[derive(Debug, Clone)] pub struct Deque { d: Vec, @@ -183,32 +220,18 @@ impl ServerState { shared_state: SharedState, filename: String, close_when_done: bool, - ) { - let shared_state_clone = shared_state.clone(); - self.connection_join(); - shared_state_clone.set_running(true, client_send.clone()); - let handle = thread::spawn(move || { - if let Ok(stream) = fs::File::open(&filename) { - println!("Opened file successfully!"); - shared_state_clone.update_file_history(filename.clone()); - shared_state_clone.set_current_connection(filename); - refresh_navbar(&mut client_send.clone(), shared_state.clone()); - let shared_state_clone_ = shared_state.clone(); - process_messages( - stream, - shared_state_clone_, - client_send.clone(), - RealtimeDelay::On, - ); - if close_when_done { - close_frontend(&mut client_send.clone()); - } - } else { - println!("Couldn't open file..."); - } - shared_state.set_running(false, client_send.clone()); - }); - self.new_connection(handle); + ) -> Result<()> { + let conn = Connection::file(filename.clone())?; + info!("Opened file successfully!"); + shared_state.update_file_history(filename); + self.connect( + conn, + client_send, + shared_state, + RealtimeDelay::On, + close_when_done, + ); + Ok(()) } /// Helper function for attempting to open a tcp connection and process SBP messages from it. @@ -224,30 +247,12 @@ impl ServerState { shared_state: SharedState, host: String, port: u16, - ) { - let shared_state_clone = shared_state.clone(); - shared_state_clone.set_running(true, client_send.clone()); - self.connection_join(); - let handle = thread::spawn(move || { - let shared_state_clone = shared_state.clone(); - let host_port = format!("{}:{}", host, port); - if let Ok(stream) = TcpStream::connect(host_port.clone()) { - info!("Connected to the server {}!", host_port); - shared_state_clone.update_tcp_history(host, port); - shared_state_clone.set_current_connection(host_port); - refresh_navbar(&mut client_send.clone(), shared_state.clone()); - process_messages( - stream, - shared_state_clone, - client_send.clone(), - RealtimeDelay::Off, - ); - } else { - warn!("Couldn't connect to server..."); - } - shared_state.set_running(false, client_send.clone()); - }); - self.new_connection(handle); + ) -> Result<()> { + let conn = Connection::tcp(host.clone(), port)?; + info!("Connected to tcp stream!"); + shared_state.update_tcp_history(host, port); + self.connect(conn, client_send, shared_state, RealtimeDelay::Off, false); + Ok(()) } /// Helper function for attempting to open a serial port and process SBP messages from it. @@ -265,32 +270,32 @@ impl ServerState { device: String, baudrate: u32, flow: FlowControl, + ) -> Result<()> { + let conn = Connection::serial(device, baudrate, flow)?; + info!("Connected to serialport!"); + self.connect(conn, client_send, shared_state, RealtimeDelay::Off, false); + Ok(()) + } + + fn connect( + &self, + conn: Connection, + mut client_send: ClientSender, + shared_state: SharedState, + delay: RealtimeDelay, + close_when_done: bool, ) { - let shared_state_clone = shared_state.clone(); - shared_state_clone.set_running(true, client_send.clone()); + shared_state.set_current_connection(conn.name.clone()); + shared_state.set_running(true, client_send.clone()); self.connection_join(); - let handle = thread::spawn(move || { - let shared_state_clone = shared_state.clone(); - match serialport::new(&device, baudrate) - .flow_control(*flow) - .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) - .open() - { - Ok(port) => { - println!("Connected to serialport {}.", device); - shared_state_clone.set_current_connection(format!("{} @{}", device, baudrate)); - process_messages( - port, - shared_state_clone, - client_send.clone(), - RealtimeDelay::Off, - ); - } - Err(e) => eprintln!("Unable to connect to serialport: {}", e), + self.new_connection(thread::spawn(move || { + refresh_navbar(&mut client_send, shared_state.clone()); + process_messages(conn, shared_state.clone(), client_send.clone(), delay); + if close_when_done { + close_frontend(&mut client_send); } - shared_state.set_running(false, client_send.clone()); - }); - self.new_connection(handle); + shared_state.set_running(false, client_send); + })); } } @@ -1802,6 +1807,52 @@ pub struct VelLog { pub num_signals: u8, } +pub struct Connection { + pub name: String, + pub rdr: Box, + pub wtr: Box, +} + +impl Connection { + pub fn tcp(host: String, port: u16) -> Result { + let name = format!("{}:{}", host, port); + let rdr = TcpStream::connect(&name)?; + let wtr = rdr.try_clone()?; + Ok(Self { + name, + rdr: Box::new(rdr), + wtr: Box::new(wtr), + }) + } + + pub fn serial(device: String, baudrate: u32, flow: FlowControl) -> Result { + let rdr = serialport::new(&device, baudrate) + .flow_control(*flow) + .timeout(Duration::from_millis(SERIALPORT_READ_TIMEOUT_MS)) + .open()?; + let wtr = rdr.try_clone()?; + Ok(Self { + name: device, + rdr: Box::new(rdr), + wtr: Box::new(wtr), + }) + } + + pub fn file(filename: String) -> Result { + let rdr = fs::File::open(&filename)?; + let wtr = io::sink(); + Ok(Self { + name: filename, + rdr: Box::new(rdr), + wtr: Box::new(wtr), + }) + } + + pub fn into_io(self) -> (Box, Box) { + (self.rdr, self.wtr) + } +} + #[cfg(test)] mod tests { use super::*; @@ -1981,12 +2032,14 @@ mod tests { let filename = TEST_SHORT_FILEPATH.to_string(); receive_thread(client_receive); assert!(!shared_state.is_running()); - server_state.connect_to_file( - client_send, - shared_state.clone(), - filename, - /*close_when_done = */ true, - ); + server_state + .connect_to_file( + client_send, + shared_state.clone(), + filename, + /*close_when_done = */ true, + ) + .unwrap(); sleep(Duration::from_millis( DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, )); @@ -2010,12 +2063,14 @@ mod tests { let filename = TEST_SHORT_FILEPATH.to_string(); receive_thread(client_receive); assert!(!shared_state.is_running()); - server_state.connect_to_file( - client_send, - shared_state.clone(), - filename, - /*close_when_done = */ true, - ); + server_state + .connect_to_file( + client_send, + shared_state.clone(), + filename, + /*close_when_done = */ true, + ) + .unwrap(); sleep(Duration::from_millis( DELAY_BEFORE_CHECKING_APP_STARTED_IN_MS, )); @@ -2045,12 +2100,14 @@ mod tests { let handle = receive_thread(client_receive); assert!(!shared_state.is_running()); { - server_state.connect_to_file( - client_send.clone(), - shared_state.clone(), - filename, - /*close_when_done = */ true, - ); + server_state + .connect_to_file( + client_send.clone(), + shared_state.clone(), + filename, + /*close_when_done = */ true, + ) + .unwrap(); } sleep(Duration::from_millis(5)); diff --git a/console_backend/src/utils.rs b/console_backend/src/utils.rs index b492ab54e..70fe6db66 100644 --- a/console_backend/src/utils.rs +++ b/console_backend/src/utils.rs @@ -1,10 +1,11 @@ +use std::collections::HashMap; + use capnp::message::Builder; use capnp::message::HeapAllocator; use capnp::serialize; use indexmap::IndexSet; use log::warn; use serialport::available_ports; -use std::collections::HashMap; use crate::constants::*; use crate::errors::*; diff --git a/console_backend/tests/mem_benches.rs b/console_backend/tests/mem_benches.rs index 8f9130aa5..19c7ab638 100644 --- a/console_backend/tests/mem_benches.rs +++ b/console_backend/tests/mem_benches.rs @@ -4,7 +4,6 @@ mod mem_bench_impl { use ndarray::{ArrayView, Axis, Dim}; use std::{ error::Error, - fs, result::Result, sync::{mpsc, Arc, Mutex}, thread, @@ -14,7 +13,7 @@ mod mem_bench_impl { use console_backend::{ process_messages, - types::{ClientSender, RealtimeDelay, SharedState}, + types::{ClientSender, Connection, RealtimeDelay, SharedState}, }; const BENCH_FILEPATH: &str = "./tests/data/piksi-relay-1min.sbp"; @@ -99,15 +98,8 @@ mod mem_bench_impl { }; let shared_state = SharedState::new(); shared_state.set_running(true, client_send.clone()); - match fs::File::open(BENCH_FILEPATH) { - Ok(fileopen) => process_messages::process_messages( - fileopen, - shared_state, - client_send, - RealtimeDelay::On, - ), - Err(e) => panic!("unable to read file, {}.", e), - } + let conn = Connection::file(BENCH_FILEPATH.into()).unwrap(); + process_messages::process_messages(conn, shared_state, client_send, RealtimeDelay::On); } recv_thread.join().expect("join should succeed"); mem_read_thread.join().expect("join should succeed"); diff --git a/utils/glob.ds b/utils/glob.ds index 4d5b77ff9..3d7cb4c48 100644 --- a/utils/glob.ds +++ b/utils/glob.ds @@ -1,5 +1,5 @@ fn glob_paths_excluding_target - + handle = glob_array ${1} out = array cargo_home = pwd