Skip to content

Commit fce9caf

Browse files
authored
Callbacks (#106)
1 parent 74d2dbe commit fce9caf

File tree

12 files changed

+750
-432
lines changed

12 files changed

+750
-432
lines changed

console_backend/src/baseline_tab.rs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
use capnp::message::Builder;
1+
use std::collections::HashMap;
22

3+
use capnp::message::Builder;
34
use log::error;
45
use sbp::messages::{
56
navigation::{MsgAgeCorrections, MsgUtcTime},
67
orientation::MsgBaselineHeading,
78
piksi::MsgResetFilters,
89
};
9-
use std::{collections::HashMap, io::Write};
1010

1111
use crate::constants::*;
1212
use crate::date_conv::*;
13-
use crate::output::{BaselineLog, CsvSerializer};
13+
use crate::output::BaselineLog;
1414
use crate::piksi_tools_constants::EMPTY_STR;
1515
use crate::types::{
1616
BaselineNED, CapnProtoSender, Deque, GnssModes, GpsTime, MsgSender, Result, SharedState,
@@ -52,9 +52,8 @@ pub(crate) struct BaselineTabButtons {
5252
/// - `table`: This stores all the key/value pairs to be displayed in the Baseline Table.
5353
/// - `utc_source`: The string equivalent for the source of the UTC updates.
5454
/// - `utc_time`: The stored monotonic Utc time.
55-
/// - `baseline_log_file`: The CsvSerializer corresponding to an open velocity log if any.
5655
/// - `week`: The stored week value from GPS Time messages.
57-
pub struct BaselineTab<'a, S: CapnProtoSender, W: Write> {
56+
pub struct BaselineTab<'a, S: CapnProtoSender> {
5857
age_corrections: Option<f64>,
5958
client_sender: S,
6059
heading: Option<f64>,
@@ -73,17 +72,12 @@ pub struct BaselineTab<'a, S: CapnProtoSender, W: Write> {
7372
table: HashMap<&'a str, String>,
7473
utc_source: Option<String>,
7574
utc_time: Option<UtcDateTime>,
76-
pub baseline_log_file: Option<CsvSerializer>,
7775
week: Option<u16>,
78-
wtr: MsgSender<W>,
76+
wtr: MsgSender,
7977
}
8078

81-
impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> {
82-
pub fn new(
83-
shared_state: SharedState,
84-
client_sender: S,
85-
wtr: MsgSender<W>,
86-
) -> BaselineTab<'a, S, W> {
79+
impl<'a, S: CapnProtoSender> BaselineTab<'a, S> {
80+
pub fn new(shared_state: SharedState, client_sender: S, wtr: MsgSender) -> BaselineTab<'a, S> {
8781
BaselineTab {
8882
age_corrections: None,
8983
client_sender,
@@ -122,7 +116,6 @@ impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> {
122116
},
123117
utc_source: None,
124118
utc_time: None,
125-
baseline_log_file: None,
126119
week: None,
127120
wtr,
128121
}
@@ -294,22 +287,25 @@ impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> {
294287
}
295288
}
296289

297-
if let Some(baseline_file) = &mut self.baseline_log_file {
298-
let pc_time = format!("{}:{:0>6.06}", tloc, secloc);
299-
if let Err(err) = baseline_file.serialize(&BaselineLog {
300-
pc_time,
301-
gps_time,
302-
tow_s: Some(tow),
303-
north_m: Some(n),
304-
east_m: Some(e),
305-
down_m: Some(d),
306-
h_accuracy_m: Some(h_accuracy),
307-
v_accuracy_m: Some(v_accuracy),
308-
distance_m: Some(dist),
309-
flags: baseline_ned_fields.flags,
310-
num_sats: baseline_ned_fields.n_sats,
311-
}) {
312-
eprintln!("Unable to to write to baseline log, error {}.", err);
290+
{
291+
let mut shared_data = self.shared_state.lock().unwrap();
292+
if let Some(ref mut baseline_file) = (*shared_data).baseline_tab.log_file {
293+
let pc_time = format!("{}:{:0>6.06}", tloc, secloc);
294+
if let Err(err) = baseline_file.serialize(&BaselineLog {
295+
pc_time,
296+
gps_time,
297+
tow_s: Some(tow),
298+
north_m: Some(n),
299+
east_m: Some(e),
300+
down_m: Some(d),
301+
h_accuracy_m: Some(h_accuracy),
302+
v_accuracy_m: Some(v_accuracy),
303+
distance_m: Some(dist),
304+
flags: baseline_ned_fields.flags,
305+
num_sats: baseline_ned_fields.n_sats,
306+
}) {
307+
eprintln!("Unable to to write to baseline log, error {}.", err);
308+
}
313309
}
314310
}
315311

console_backend/src/bin/fft_monitor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Context;
12
use clap::Clap;
23
use crossbeam::{channel, scope};
34
use sbp::{messages::piksi::MsgSpecan, sbp_tools::SBPTools};
@@ -60,7 +61,10 @@ fn main() -> Result<()> {
6061
println!("Writing to file: {}", &filename);
6162
let channel = opts.channel;
6263
let num_ffts = opts.num_ffts;
63-
let (rdr, _) = opts.into_conn().try_connect(/*shared_state=*/ None)?;
64+
let (rdr, _) = opts
65+
.into_conn()
66+
.try_connect(/*shared_state=*/ None)
67+
.context("while connecting")?;
6468

6569
scope(|s| {
6670
s.spawn(|_| run(rdr));

console_backend/src/bin/fileio.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crossbeam::{channel, scope};
88
use sbp::sbp_tools::SBPTools;
99

1010
use console_backend::{
11-
broadcaster::Broadcaster,
11+
broadcaster::Link,
1212
cli_options::Input,
1313
fileio::Fileio,
1414
types::{MsgSender, Result},
@@ -49,15 +49,15 @@ pub enum Opts {
4949
}
5050

5151
fn main() -> Result<()> {
52-
let bc = Broadcaster::new();
52+
let link = Link::new();
53+
let link_source = link.clone();
5354

5455
let (done_tx, done_rx) = channel::bounded(0);
5556

56-
let bc_source = bc.clone();
5757
let run = move |rdr| {
5858
let messages = sbp::iter_messages(rdr).log_errors(log::Level::Debug);
5959
for msg in messages {
60-
bc_source.send(&msg, None);
60+
link_source.send(&msg, None);
6161
if done_rx.try_recv().is_ok() {
6262
break;
6363
}
@@ -74,7 +74,7 @@ fn main() -> Result<()> {
7474
let sender = MsgSender::new(wtr);
7575
scope(|s| {
7676
s.spawn(|_| run(rdr));
77-
let mut fileio = Fileio::new(bc, sender);
77+
let mut fileio = Fileio::new(link, sender);
7878
let data = fs::File::open(source)?;
7979
fileio.overwrite(dest, data)?;
8080
eprintln!("file written successfully.");
@@ -92,7 +92,7 @@ fn main() -> Result<()> {
9292
let sender = MsgSender::new(wtr);
9393
scope(|s| {
9494
s.spawn(|_| run(rdr));
95-
let mut fileio = Fileio::new(bc, sender);
95+
let mut fileio = Fileio::new(link, sender);
9696
let dest: Box<dyn Write> = match dest {
9797
Some(path) => Box::new(fs::File::create(path)?),
9898
None => Box::new(io::stdout()),
@@ -108,7 +108,7 @@ fn main() -> Result<()> {
108108
let sender = MsgSender::new(wtr);
109109
scope(|s| {
110110
s.spawn(|_| run(rdr));
111-
let mut fileio = Fileio::new(bc, sender);
111+
let mut fileio = Fileio::new(link, sender);
112112
let files = fileio.readdir(path)?;
113113
eprintln!("{:#?}", files);
114114
done_tx.send(true).unwrap();
@@ -121,7 +121,7 @@ fn main() -> Result<()> {
121121
let sender = MsgSender::new(wtr);
122122
scope(|s| {
123123
s.spawn(|_| run(rdr));
124-
let fileio = Fileio::new(bc, sender);
124+
let fileio = Fileio::new(link, sender);
125125
fileio.remove(path)?;
126126
eprintln!("file deleted.");
127127
done_tx.send(true).unwrap();

console_backend/src/broadcaster.rs

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use sbp::{
1010
messages::{ConcreteMessage, SBPMessage, SBP},
1111
time::{GpsTime, GpsTimeError},
1212
};
13-
use slotmap::HopSlotMap;
13+
use slotmap::{DenseSlotMap, HopSlotMap};
1414

1515
type MaybeGpsTime = Option<Result<GpsTime, GpsTimeError>>;
1616

@@ -164,6 +164,116 @@ where
164164
}
165165
}
166166

167+
pub trait Handler<Event, Kind> {
168+
fn run(&mut self, event: Event, time: MaybeGpsTime);
169+
}
170+
171+
pub struct WithTime;
172+
173+
impl<F, E> Handler<E, WithTime> for F
174+
where
175+
F: FnMut(E, MaybeGpsTime),
176+
E: Event,
177+
{
178+
fn run(&mut self, event: E, time: MaybeGpsTime) {
179+
(self)(event, time)
180+
}
181+
}
182+
183+
pub struct WithoutTime;
184+
185+
impl<F, E> Handler<E, WithoutTime> for F
186+
where
187+
F: FnMut(E),
188+
E: Event,
189+
{
190+
fn run(&mut self, event: E, _time: MaybeGpsTime) {
191+
(self)(event)
192+
}
193+
}
194+
195+
pub struct OnlyTime;
196+
197+
impl<F, E> Handler<E, OnlyTime> for F
198+
where
199+
F: FnMut(GpsTime),
200+
E: Event,
201+
{
202+
fn run(&mut self, _event: E, time: MaybeGpsTime) {
203+
if let Some(Ok(time)) = time {
204+
(self)(time)
205+
}
206+
}
207+
}
208+
209+
struct Callback<'a> {
210+
func: Box<dyn FnMut(SBP, MaybeGpsTime) + Send + 'a>,
211+
msg_types: &'static [u16],
212+
}
213+
214+
pub struct Link<'a> {
215+
callbacks: Arc<Mutex<DenseSlotMap<KeyInner, Callback<'a>>>>,
216+
}
217+
218+
impl<'a> Link<'a> {
219+
const CB_LOCK_FAILURE: &'static str = "failed to aquire lock on callbacks";
220+
221+
pub fn new() -> Self {
222+
Self {
223+
callbacks: Arc::new(Mutex::new(DenseSlotMap::with_key())),
224+
}
225+
}
226+
227+
pub fn send(&self, message: &SBP, gps_time: MaybeGpsTime) -> bool {
228+
let msg_type = message.get_message_type();
229+
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
230+
let to_call = cbs
231+
.values_mut()
232+
.filter(|cb| cb.msg_types.is_empty() || cb.msg_types.iter().any(|ty| ty == &msg_type));
233+
let mut called = false;
234+
for cb in to_call {
235+
(cb.func)(message.clone(), gps_time.clone());
236+
called = true;
237+
}
238+
called
239+
}
240+
241+
pub fn register_cb<H, E, K>(&mut self, mut handler: H) -> Key
242+
where
243+
H: Handler<E, K> + Send + 'a,
244+
E: Event,
245+
{
246+
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
247+
let inner = cbs.insert(Callback {
248+
func: Box::new(move |msg, time| {
249+
let event = E::from_sbp(msg);
250+
handler.run(event, time)
251+
}),
252+
msg_types: E::MESSAGE_TYPES,
253+
});
254+
Key { inner }
255+
}
256+
257+
pub fn unregister_cb(&self, key: Key) {
258+
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
259+
cbs.remove(key.inner);
260+
}
261+
}
262+
263+
impl Clone for Link<'_> {
264+
fn clone(&self) -> Self {
265+
Self {
266+
callbacks: Arc::clone(&self.callbacks),
267+
}
268+
}
269+
}
270+
271+
impl Default for Link<'_> {
272+
fn default() -> Self {
273+
Self::new()
274+
}
275+
}
276+
167277
#[cfg(test)]
168278
mod tests {
169279
use crossbeam::scope;
@@ -174,6 +284,27 @@ mod tests {
174284

175285
use super::*;
176286

287+
struct Counter {
288+
count: usize,
289+
}
290+
291+
impl Counter {
292+
fn obs(&mut self, _: MsgObs) {
293+
self.count += 1;
294+
}
295+
}
296+
297+
#[test]
298+
fn test_dispatcher() {
299+
let mut d = Link::new();
300+
let mut c = Counter { count: 0 };
301+
d.register_cb(|obs| c.obs(obs));
302+
d.send(&make_msg_obs(), None);
303+
d.send(&make_msg_obs_dep_a(), None);
304+
drop(d);
305+
assert_eq!(c.count, 1);
306+
}
307+
177308
#[test]
178309
fn test_broadcaster() {
179310
let b = Broadcaster::new();

console_backend/src/connection.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::constants::*;
22
use crate::errors::*;
33
use crate::process_messages::process_messages;
44
use crate::types::*;
5-
use chrono::{DateTime, Utc};
5+
use anyhow::anyhow;
66
use crossbeam::channel::{unbounded, Receiver, Sender};
77
use log::{error, info};
88
use std::{
@@ -16,10 +16,6 @@ use std::{
1616
time::Duration,
1717
};
1818

19-
pub type Error = std::boxed::Box<dyn std::error::Error>;
20-
pub type Result<T> = std::result::Result<T, Error>;
21-
pub type UtcDateTime = DateTime<Utc>;
22-
2319
#[derive(Clone)]
2420
pub struct TcpConnection {
2521
name: String,
@@ -33,13 +29,11 @@ impl TcpConnection {
3329
}
3430
fn socket_addrs(name: String) -> Result<SocketAddr> {
3531
let socket = &mut name.to_socket_addrs()?;
36-
let socket = if let Some(socket_) = socket.next() {
37-
socket_
32+
if let Some(s) = socket.next() {
33+
Ok(s)
3834
} else {
39-
let e: Box<dyn std::error::Error> = String::from(TCP_CONNECTION_PARSING_FAILURE).into();
40-
return Err(e);
41-
};
42-
Ok(socket)
35+
Err(anyhow!("{}", TCP_CONNECTION_PARSING_FAILURE))
36+
}
4337
}
4438
fn name(&self) -> String {
4539
self.name.clone()

console_backend/src/fft_monitor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::constants::{AMPLITUDES, CHANNELS, FREQUENCIES, SIGNALS_TOTAL};
22
use crate::types::{Result, Specan};
3+
use anyhow::bail;
34
use serde::Serialize;
45
use std::{
56
collections::HashMap,
@@ -111,7 +112,7 @@ impl FftMonitor {
111112
if let Some(freqs) = fft.get(FREQUENCIES) {
112113
if let Some(amps) = fft.get(AMPLITUDES) {
113114
if freqs.len() != amps.len() {
114-
return Err(format!("Frequencies length does not match amplitudes length for {:?}", gps_time).into());
115+
bail!("Frequencies length does not match amplitudes length for {:?}", gps_time);
115116
}
116117
if let Some(chan_ffts) = self.ffts.get_mut(&channel) {
117118
chan_ffts.append(&mut vec![fft]);

0 commit comments

Comments
 (0)