Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions console_backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ crossbeam = "0.8.1"
rand = "0.8.3"
slotmap = "1.0.3"
serde-pickle = { version = "0.6.2", optional = true }
parking_lot = "0.11.1"

[target.'cfg(any(target_os = "macos", target_os = "windows"))'.dependencies]
serialport = "4.0.1"
Expand Down
10 changes: 5 additions & 5 deletions console_backend/src/baseline_tab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) struct BaselineTabButtons {
/// - `utc_source`: The string equivalent for the source of the UTC updates.
/// - `utc_time`: The stored monotonic Utc time.
/// - `week`: The stored week value from GPS Time messages.
pub struct BaselineTab<'a, S: CapnProtoSender> {
pub struct BaselineTab<S: CapnProtoSender> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the existing lifetimes on tabs, because the references they were using were 'static so they were unneeded

age_corrections: Option<f64>,
client_sender: S,
heading: Option<f64>,
Expand All @@ -68,16 +68,16 @@ pub struct BaselineTab<'a, S: CapnProtoSender> {
shared_state: SharedState,
sln_cur_data: Vec<Vec<(f64, f64)>>,
sln_data: Vec<Vec<(f64, f64)>>,
slns: HashMap<&'a str, Deque<f64>>,
table: HashMap<&'a str, String>,
slns: HashMap<&'static str, Deque<f64>>,
table: HashMap<&'static str, String>,
utc_source: Option<String>,
utc_time: Option<UtcDateTime>,
week: Option<u16>,
wtr: MsgSender,
}

impl<'a, S: CapnProtoSender> BaselineTab<'a, S> {
pub fn new(shared_state: SharedState, client_sender: S, wtr: MsgSender) -> BaselineTab<'a, S> {
impl<S: CapnProtoSender> BaselineTab<S> {
pub fn new(shared_state: SharedState, client_sender: S, wtr: MsgSender) -> BaselineTab<S> {
BaselineTab {
age_corrections: None,
client_sender,
Expand Down
109 changes: 82 additions & 27 deletions console_backend/src/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::{
convert::TryInto,
marker::PhantomData,
sync::{Arc, Mutex},
time::Duration,
};
use std::{convert::TryInto, marker::PhantomData, sync::Arc, time::Duration};

use crossbeam::channel;
use parking_lot::Mutex;
Copy link
Collaborator

@john-michaelburke john-michaelburke Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we gain with the parkinglot mutex? Looks like it doesnt require unwrapping the lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what specifically we needed for this change though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah another thing I needed for the settings tab. It has a function to unlock a mutex without a guard (https://docs.rs/lock_api/0.4.5/lock_api/struct.Mutex.html#method.force_unlock) which I needed for the FFI stuff. Don't actually need it here I guess.

They are supposed to be faster, and they don't allocate their contents on the heap like std mutexs. You don't have to unwrap them because they don't 'poison'. So if a thread holding a mutex panics it just releases the mutex. Which is supposed to be fine for 'normal' panics like unwrap() and stuff. The poisoning is an extra safeguard against panics that leave the contents of the mutex in an 'undefined' state which is apparently very uncommon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it should be safe for us then too

use sbp::{
messages::{ConcreteMessage, SBPMessage, SBP},
time::{GpsTime, GpsTimeError},
Expand All @@ -19,8 +15,6 @@ pub struct Broadcaster {
}

impl Broadcaster {
const CHANNELS_LOCK_FAILURE: &'static str = "failed to aquire lock on channels";

pub fn new() -> Self {
Self {
channels: Arc::new(Mutex::new(HopSlotMap::with_key())),
Expand All @@ -29,7 +23,7 @@ impl Broadcaster {

pub fn send(&self, message: &SBP, gps_time: MaybeGpsTime) {
let msg_type = message.get_message_type();
let mut channels = self.channels.lock().expect(Self::CHANNELS_LOCK_FAILURE);
let mut channels = self.channels.lock();
channels.retain(|_, chan| {
if chan.msg_types.iter().any(|ty| ty == &msg_type) {
chan.inner.send((message.clone(), gps_time.clone())).is_ok()
Expand All @@ -44,7 +38,7 @@ impl Broadcaster {
E: Event,
{
let (tx, rx) = channel::unbounded();
let mut channels = self.channels.lock().expect(Self::CHANNELS_LOCK_FAILURE);
let mut channels = self.channels.lock();
let key = channels.insert(Sender {
inner: tx,
msg_types: E::MESSAGE_TYPES,
Expand All @@ -59,7 +53,7 @@ impl Broadcaster {
}

pub fn unsubscribe(&self, key: Key) {
let mut channels = self.channels.lock().expect(Self::CHANNELS_LOCK_FAILURE);
let mut channels = self.channels.lock();
channels.remove(key.inner);
}

Expand Down Expand Up @@ -206,18 +200,69 @@ where
}
}

struct Callback<'a> {
func: Box<dyn FnMut(SBP, MaybeGpsTime) + Send + 'a>,
msg_types: &'static [u16],
enum Callback<'a> {
Id {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed id based callbacks for settings stuff, it was easier to just include it there than breaking it out

func: Box<dyn FnMut(SBP, MaybeGpsTime) + Send + 'a>,
msg_type: u16,
},
Event {
func: Box<dyn FnMut(SBP, MaybeGpsTime) + Send + 'a>,
msg_types: &'static [u16],
},
}

impl<'a> Callback<'a> {
fn run(&mut self, msg: SBP, time: MaybeGpsTime) {
match self {
Callback::Id { func, .. } | Callback::Event { func, .. } => (func)(msg, time),
}
}

fn should_run(&self, msg: u16) -> bool {
match self {
Callback::Id { msg_type, .. } => msg_type == &msg,
Callback::Event { msg_types, .. } => msg_types.contains(&msg),
}
}
}

pub fn with_link<'env, F>(f: F)
where
F: FnOnce(&LinkSource<'env>),
{
let source: LinkSource<'env> = LinkSource { link: Link::new() };
f(&source);
}

pub struct LinkSource<'env> {
link: Link<'env>,
}

impl<'env> LinkSource<'env> {
pub fn link<'scope>(&self) -> Link<'scope>
where
'env: 'scope,
{
let link: Link<'scope> = unsafe { std::mem::transmute(self.link.clone()) };
link
}
Comment on lines +242 to +248
Copy link
Contributor Author

@notoriaga notoriaga Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lets you create Links from LinkSource with distinct lifetimes ('scope), that are at most as long as 'env (the 'parent' lifetime). This basically forces the behavior that @silverjam was talking about w.r.t. how clone should intuitively behave in this case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


pub fn send(&self, message: &SBP, gps_time: MaybeGpsTime) -> bool {
self.link.send(message, gps_time)
}
}

impl<'env> Drop for LinkSource<'env> {
fn drop(&mut self) {
self.link.callbacks.lock().clear();
}
}

pub struct Link<'a> {
callbacks: Arc<Mutex<DenseSlotMap<KeyInner, Callback<'a>>>>,
}

impl<'a> Link<'a> {
const CB_LOCK_FAILURE: &'static str = "failed to aquire lock on callbacks";

pub fn new() -> Self {
Self {
callbacks: Arc::new(Mutex::new(DenseSlotMap::with_key())),
Expand All @@ -226,25 +271,35 @@ impl<'a> Link<'a> {

pub fn send(&self, message: &SBP, gps_time: MaybeGpsTime) -> bool {
let msg_type = message.get_message_type();
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
let to_call = cbs
.values_mut()
.filter(|cb| cb.msg_types.is_empty() || cb.msg_types.iter().any(|ty| ty == &msg_type));
let mut cbs = self.callbacks.lock();
let to_call = cbs.values_mut().filter(|cb| cb.should_run(msg_type));
let mut called = false;
for cb in to_call {
(cb.func)(message.clone(), gps_time.clone());
cb.run(message.clone(), gps_time.clone());
called = true;
}
called
}

pub fn register_cb<H, E, K>(&mut self, mut handler: H) -> Key
pub fn register_cb_by_id<H>(&self, id: u16, mut handler: H) -> Key
where
H: FnMut(SBP) + Send + 'a,
{
let mut cbs = self.callbacks.lock();
let inner = cbs.insert(Callback::Id {
func: Box::new(move |msg, _time| (handler)(msg)),
msg_type: id,
});
Key { inner }
}

pub fn register_cb<H, E, K>(&self, mut handler: H) -> Key
where
H: Handler<E, K> + Send + 'a,
E: Event,
{
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
let inner = cbs.insert(Callback {
let mut cbs = self.callbacks.lock();
let inner = cbs.insert(Callback::Event {
func: Box::new(move |msg, time| {
let event = E::from_sbp(msg);
handler.run(event, time)
Expand All @@ -255,7 +310,7 @@ impl<'a> Link<'a> {
}

pub fn unregister_cb(&self, key: Key) {
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
let mut cbs = self.callbacks.lock();
cbs.remove(key.inner);
}
}
Expand All @@ -268,7 +323,7 @@ impl Clone for Link<'_> {
}
}

impl Default for Link<'_> {
impl<'a> Default for Link<'a> {
fn default() -> Self {
Self::new()
}
Expand Down Expand Up @@ -296,7 +351,7 @@ mod tests {

#[test]
fn test_dispatcher() {
let mut d = Link::new();
let d = Link::new();
let mut c = Counter { count: 0 };
d.register_cb(|obs| c.obs(obs));
d.send(&make_msg_obs(), None);
Expand Down
11 changes: 7 additions & 4 deletions console_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,26 @@ use crate::{
tracking_signals_tab::TrackingSignalsTab,
};

struct Tabs<'a, S: types::CapnProtoSender> {
struct Tabs<'link, S: types::CapnProtoSender> {
pub main: Mutex<MainTab<S>>,
pub advanced_ins: Mutex<AdvancedInsTab<S>>,
pub advanced_magnetometer: Mutex<AdvancedMagnetometerTab<S>>,
pub baseline: Mutex<BaselineTab<'a, S>>,
pub baseline: Mutex<BaselineTab<S>>,
pub tracking_signals: Mutex<TrackingSignalsTab<S>>,
pub solution: Mutex<SolutionTab<S>>,
pub observation: Mutex<ObservationTab<S>>,
pub solution_velocity: Mutex<SolutionVelocityTab<'a, S>>,
pub solution_velocity: Mutex<SolutionVelocityTab<S>>,
pub advanced_spectrum_analyzer: Mutex<AdvancedSpectrumAnalyzerTab<S>>,
pub status_bar: Mutex<StatusBar<S>>,
_link: broadcaster::Link<'link>,
Copy link
Contributor Author

@notoriaga notoriaga Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this just to show that everything compiles when Tabs is holding a Link (so should work for Fileio as well)

}

impl<'a, S: types::CapnProtoSender> Tabs<'a, S> {
impl<'link, S: types::CapnProtoSender> Tabs<'link, S> {
fn new(
shared_state: types::SharedState,
client_sender: S,
msg_sender: types::MsgSender,
link: broadcaster::Link<'link>,
) -> Self {
Self {
main: MainTab::new(shared_state.clone(), client_sender.clone()).into(),
Expand All @@ -86,6 +88,7 @@ impl<'a, S: types::CapnProtoSender> Tabs<'a, S> {
)
.into(),
status_bar: StatusBar::new(shared_state, client_sender).into(),
_link: link,
}
}
}
Expand Down
Loading