From 183e057e824b2b52af4d07b68cc4500779d46420 Mon Sep 17 00:00:00 2001 From: Steve Meyer Date: Thu, 4 Nov 2021 13:04:33 -0700 Subject: [PATCH] refactor realtime delay --- console_backend/src/connection.rs | 6 +- console_backend/src/main_tab.rs | 87 +---------- console_backend/src/process_messages.rs | 200 +++++++++++++++++++----- 3 files changed, 167 insertions(+), 126 deletions(-) diff --git a/console_backend/src/connection.rs b/console_backend/src/connection.rs index 28ddfce9c..f3f2b3278 100644 --- a/console_backend/src/connection.rs +++ b/console_backend/src/connection.rs @@ -122,7 +122,11 @@ fn conn_manager_thd( continue; } }; - let (messages, stop_token) = Messages::from_reader(reader); + let (messages, stop_token) = if conn.realtime_delay() == RealtimeDelay::On { + Messages::with_realtime_delay(reader) + } else { + Messages::new(reader) + }; let msg_sender = MsgSender::new(writer); shared_state.set_connection( ConnectionState::Connected { diff --git a/console_backend/src/main_tab.rs b/console_backend/src/main_tab.rs index 91d16d3c9..b168d87a6 100644 --- a/console_backend/src/main_tab.rs +++ b/console_backend/src/main_tab.rs @@ -1,15 +1,13 @@ use std::{ path::PathBuf, - result::Result, - thread::sleep, time::{Duration, Instant}, }; use capnp::message::Builder; use chrono::Local; use crossbeam::channel::Receiver; -use log::{debug, error}; -use sbp::{time::GpsTime, Sbp}; +use log::error; +use sbp::Sbp; use crate::client_sender::BoxedClientSender; use crate::constants::{ @@ -88,8 +86,6 @@ pub struct MainTab { last_sbp_logging: bool, last_sbp_logging_format: SbpLogging, sbp_logger: Option, - last_gps_update: Instant, - last_gps_time: Option, client_sender: BoxedClientSender, shared_state: SharedState, } @@ -102,40 +98,11 @@ impl MainTab { last_sbp_logging: false, last_sbp_logging_format: SbpLogging::SBP_JSON, sbp_logger: None, - last_gps_time: None, - last_gps_update: Instant::now(), client_sender, shared_state, } } - /// Calculate time since last epoch began and sleep for previous epoch time difference. - /// - /// # Parameters - /// - `gps_time`: The GpsTime corresponding to a message. - pub fn realtime_delay(&mut self, gps_time: Option>) { - if let Some(Ok(g_time)) = gps_time { - if let Some(l_time) = self.last_gps_time { - if l_time < g_time { - let diff = g_time - l_time; - let elapsed = self.last_gps_update.elapsed(); - if diff > elapsed { - let sleep_duration = diff - elapsed; - debug!( - "Realtime delay encounterred. Sleeping for {:?}.", - sleep_duration - ); - sleep(sleep_duration); - } - self.last_gps_update = Instant::now(); - self.last_gps_time = Some(g_time); - } - } else { - self.last_gps_time = Some(g_time); - } - } - } - /// Initialize Baseline and Solution Position and Velocity Loggers. /// /// # Generates: @@ -294,59 +261,9 @@ mod tests { use std::{ fs::File, io::{sink, BufRead, BufReader}, - time::Duration, }; use tempfile::TempDir; - struct GpsTimeTests { - pub good_week: i16, - pub early_gps_tow_good: f64, - pub later_gps_tow_good: f64, - } - impl GpsTimeTests { - fn new() -> GpsTimeTests { - let good_week: i16 = 2000; - let early_gps_tow_good: f64 = 5432.0; - let later_gps_tow_good: f64 = 5433.0; - GpsTimeTests { - good_week, - early_gps_tow_good, - later_gps_tow_good, - } - } - } - - #[test] - fn realtime_delay_full_test() { - let shared_state = SharedState::new(); - let client_send = TestSender::boxed(); - let gps_s = GpsTimeTests::new(); - let mut main = MainTab::new(shared_state, client_send); - let early_gps_time_good = GpsTime::new(gps_s.good_week, gps_s.early_gps_tow_good).unwrap(); - let later_gps_time_good = GpsTime::new(gps_s.good_week, gps_s.later_gps_tow_good); - main.last_gps_time = Some(early_gps_time_good); - let now = Instant::now(); - main.last_gps_update = Instant::now(); - main.realtime_delay(Some(later_gps_time_good)); - assert!( - now.elapsed() - > Duration::from_secs_f64(gps_s.later_gps_tow_good - gps_s.early_gps_tow_good) - ); - } - - #[test] - fn realtime_delay_no_last_test() { - let shared_state = SharedState::new(); - let client_send = TestSender::boxed(); - let gps_s = GpsTimeTests::new(); - let mut main = MainTab::new(shared_state, client_send); - let later_gps_time_good = GpsTime::new(gps_s.good_week, gps_s.later_gps_tow_good); - let now = Instant::now(); - main.last_gps_update = Instant::now(); - main.realtime_delay(Some(later_gps_time_good)); - assert!(now.elapsed() < Duration::from_millis(5)); - } - #[test] fn csv_logging_test() { let tmp_dir = TempDir::new().unwrap(); diff --git a/console_backend/src/process_messages.rs b/console_backend/src/process_messages.rs index 70d77f774..88f808323 100644 --- a/console_backend/src/process_messages.rs +++ b/console_backend/src/process_messages.rs @@ -1,3 +1,5 @@ +use std::io; + use crossbeam::channel::{bounded, Receiver, Sender}; use log::{debug, error}; use sbp::{ @@ -15,14 +17,12 @@ use sbp::{ }, tracking::{MsgMeasurementState, MsgTrackingState}, }, - SbpMessage, + Sbp, SbpMessage, }; -use std::io; use crate::client_sender::BoxedClientSender; use crate::types::{ - BaselineNED, Dops, GpsTime, MsgSender, ObservationMsg, PosLLH, RealtimeDelay, Specan, - UartState, VelNED, + BaselineNED, Dops, GpsTime, MsgSender, ObservationMsg, PosLLH, Specan, UartState, VelNED, }; use crate::utils::refresh_connection_frontend; use crate::Tabs; @@ -245,6 +245,12 @@ pub fn process_messages( handle_log_msg(msg); }); + link.register(|tabs: &Tabs, msg: Sbp| { + tabs.main.lock().unwrap().serialize_sbp(&msg); + tabs.status_bar.lock().unwrap().add_bytes(msg.encoded_len()); + tabs.advanced_networking.lock().unwrap().handle_sbp(&msg); + }); + let update_tab_context = tabs .update .lock() @@ -288,28 +294,8 @@ pub fn process_messages( settings_tab::start_thd(tab); }); } - - for (message, gps_time) in &mut messages { - let sent = source.send_with_state(&tabs, &message); - tabs.main.lock().unwrap().serialize_sbp(&message); - tabs.status_bar - .lock() - .unwrap() - .add_bytes(message.encoded_len()); - tabs.advanced_networking - .lock() - .unwrap() - .handle_sbp(&message); - if let RealtimeDelay::On = conn.realtime_delay() { - if sent { - tabs.main.lock().unwrap().realtime_delay(gps_time); - } else { - debug!( - "Message, {}, ignored for realtime delay.", - message.message_name() - ); - } - } + for (message, _) in &mut messages { + source.send_with_state(&tabs, &message); log::logger().flush(); } if let Some(ref tab) = settings_tab { @@ -332,7 +318,12 @@ pub fn process_messages( } mod messages { - use std::{fmt, io, sync::Arc, thread, time::Duration}; + use std::{ + fmt, io, + sync::Arc, + thread, + time::{Duration, Instant}, + }; use crossbeam::channel::{self, Receiver, Sender}; use log::debug; @@ -357,30 +348,39 @@ mod messages { impl Messages { const TIMEOUT: Duration = Duration::from_secs(30); - pub fn new(inner: MessageWithTimeIter) -> (Self, StopToken) { - let (stop_token, stop_recv) = StopToken::new(); - let messages = start_read_thd(inner); - ( - Self { - messages, - stop_recv, - err: Ok(()), - }, - stop_token, - ) + pub fn new(reader: R) -> (Self, StopToken) + where + R: io::Read + Send + 'static, + { + let messages = sbp::iter_messages(reader).with_rover_time(); + Self::from_boxed(Box::new(messages)) } - pub fn from_reader(reader: R) -> (Self, StopToken) + pub fn with_realtime_delay(reader: R) -> (Self, StopToken) where R: io::Read + Send + 'static, { let messages = sbp::iter_messages(reader).with_rover_time(); - Self::new(Box::new(messages)) + let messages = Box::new(RealtimeIter::new(messages)); + Self::from_boxed(messages) } pub fn take_err(&mut self) -> Result<(), io::Error> { std::mem::replace(&mut self.err, Ok(())) } + + fn from_boxed(inner: MessageWithTimeIter) -> (Self, StopToken) { + let (stop_token, stop_recv) = StopToken::new(); + let messages = start_read_thd(inner); + ( + Self { + messages, + stop_recv, + err: Ok(()), + }, + stop_token, + ) + } } impl Iterator for Messages { @@ -424,6 +424,52 @@ mod messages { rx } + struct RealtimeIter { + messages: M, + last_time: Option, + updated_at: Instant, + } + + impl RealtimeIter { + fn new(messages: M) -> Self { + Self { + messages, + last_time: None, + updated_at: Instant::now(), + } + } + } + + impl Iterator for RealtimeIter + where + M: Iterator, + { + type Item = M::Item; + + fn next(&mut self) -> Option { + let msg = self.messages.next()?; + match (self.last_time, &msg.1) { + (Some(last_time), Some(Ok(time))) if &last_time < time => { + let diff = *time - last_time; + let elapsed = self.updated_at.elapsed(); + if diff > elapsed { + let sleep_dur = diff - elapsed; + debug!("Realtime delay sleeping for {:?}", sleep_dur); + thread::sleep(sleep_dur); + } + self.last_time = Some(*time); + self.updated_at = Instant::now(); + } + (None, Some(Ok(time))) => { + self.last_time = Some(*time); + self.updated_at = Instant::now(); + } + _ => (), + }; + Some(msg) + } + } + /// Used to stop the [Messages](super::Messages) iterator. This can be called manually, /// but will automatically be called after all copies of this token have been dropped. pub struct StopToken(Arc); @@ -470,4 +516,78 @@ mod messages { self.stop(); } } + + #[cfg(test)] + mod tests { + use std::io::Cursor; + use std::time::Duration; + + use sbp::messages::logging::MsgLog; + use sbp::messages::navigation::MsgGpsTime; + + use super::*; + + // wiggle room for timing the delay + const JIFFY: Duration = Duration::from_millis(1); + + fn msg_gps_time(tow: u32) -> Sbp { + MsgGpsTime { + sender_id: Some(0), + wn: 1, + tow, + ns_residual: 1, + flags: 1, + } + .into() + } + + // any message without time would do + fn msg_log() -> Sbp { + MsgLog { + sender_id: Some(0), + level: 1, + text: String::from("hello").into(), + } + .into() + } + + #[test] + fn realtime_delay() { + let mut data = Vec::new(); + sbp::to_writer(&mut data, &msg_log()).unwrap(); + sbp::to_writer(&mut data, &msg_gps_time(1000)).unwrap(); + sbp::to_writer(&mut data, &msg_log()).unwrap(); + sbp::to_writer(&mut data, &msg_gps_time(2000)).unwrap(); // one second from the last MsgGpsTime + let (messages, _token) = Messages::with_realtime_delay(Cursor::new(data)); + let start = Instant::now(); + assert_eq!(messages.count(), 4); + assert!(start.elapsed() - Duration::from_secs(1) < JIFFY); + } + + #[test] + fn no_realtime_delay() { + let mut data = Vec::new(); + sbp::to_writer(&mut data, &msg_log()).unwrap(); + sbp::to_writer(&mut data, &msg_gps_time(1000)).unwrap(); + sbp::to_writer(&mut data, &msg_log()).unwrap(); + sbp::to_writer(&mut data, &msg_gps_time(2000)).unwrap(); + let (messages, _token) = Messages::new(Cursor::new(data)); + let start = Instant::now(); + assert_eq!(messages.count(), 4); + assert!(start.elapsed() < JIFFY); + } + + #[test] + fn realtime_delay_no_last_time() { + let mut data = Vec::new(); + sbp::to_writer(&mut data, &msg_log()).unwrap(); + sbp::to_writer(&mut data, &msg_gps_time(1000)).unwrap(); + sbp::to_writer(&mut data, &msg_log()).unwrap(); + let (messages, _token) = Messages::with_realtime_delay(Cursor::new(data)); + let start = Instant::now(); + assert_eq!(messages.count(), 3); + // only one message with time so no delay should have been added + assert!(start.elapsed() < JIFFY); + } + } }