Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions console_backend/src/baseline_tab.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use capnp::message::Builder;
use std::collections::HashMap;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this was a mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I just reordered the imports std -> third_party -> crate. we're a little bit all over the place with that (not that it particularity matters)


use capnp::message::Builder;
use log::error;
use sbp::messages::{
navigation::{MsgAgeCorrections, MsgUtcTime},
orientation::MsgBaselineHeading,
piksi::MsgResetFilters,
};
use std::{collections::HashMap, io::Write};

use crate::constants::*;
use crate::date_conv::*;
use crate::output::{BaselineLog, CsvSerializer};
use crate::output::BaselineLog;
use crate::piksi_tools_constants::EMPTY_STR;
use crate::types::{
BaselineNED, CapnProtoSender, Deque, GnssModes, GpsTime, MsgSender, Result, SharedState,
Expand Down Expand Up @@ -52,9 +52,8 @@ pub(crate) struct BaselineTabButtons {
/// - `table`: This stores all the key/value pairs to be displayed in the Baseline Table.
/// - `utc_source`: The string equivalent for the source of the UTC updates.
/// - `utc_time`: The stored monotonic Utc time.
/// - `baseline_log_file`: The CsvSerializer corresponding to an open velocity log if any.
/// - `week`: The stored week value from GPS Time messages.
pub struct BaselineTab<'a, S: CapnProtoSender, W: Write> {
pub struct BaselineTab<'a, S: CapnProtoSender> {
age_corrections: Option<f64>,
client_sender: S,
heading: Option<f64>,
Expand All @@ -73,17 +72,12 @@ pub struct BaselineTab<'a, S: CapnProtoSender, W: Write> {
table: HashMap<&'a str, String>,
utc_source: Option<String>,
utc_time: Option<UtcDateTime>,
pub baseline_log_file: Option<CsvSerializer>,
week: Option<u16>,
wtr: MsgSender<W>,
wtr: MsgSender,
}

impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> {
pub fn new(
shared_state: SharedState,
client_sender: S,
wtr: MsgSender<W>,
) -> BaselineTab<'a, S, W> {
impl<'a, S: CapnProtoSender> BaselineTab<'a, S> {
pub fn new(shared_state: SharedState, client_sender: S, wtr: MsgSender) -> BaselineTab<'a, S> {
BaselineTab {
age_corrections: None,
client_sender,
Expand Down Expand Up @@ -122,7 +116,6 @@ impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> {
},
utc_source: None,
utc_time: None,
baseline_log_file: None,
week: None,
wtr,
}
Expand Down Expand Up @@ -294,22 +287,25 @@ impl<'a, S: CapnProtoSender, W: Write> BaselineTab<'a, S, W> {
}
}

if let Some(baseline_file) = &mut self.baseline_log_file {
let pc_time = format!("{}:{:0>6.06}", tloc, secloc);
if let Err(err) = baseline_file.serialize(&BaselineLog {
pc_time,
gps_time,
tow_s: Some(tow),
north_m: Some(n),
east_m: Some(e),
down_m: Some(d),
h_accuracy_m: Some(h_accuracy),
v_accuracy_m: Some(v_accuracy),
distance_m: Some(dist),
flags: baseline_ned_fields.flags,
num_sats: baseline_ned_fields.n_sats,
}) {
eprintln!("Unable to to write to baseline log, error {}.", err);
{
let mut shared_data = self.shared_state.lock().unwrap();
if let Some(ref mut baseline_file) = (*shared_data).baseline_tab.log_file {
let pc_time = format!("{}:{:0>6.06}", tloc, secloc);
if let Err(err) = baseline_file.serialize(&BaselineLog {
pc_time,
gps_time,
tow_s: Some(tow),
north_m: Some(n),
east_m: Some(e),
down_m: Some(d),
h_accuracy_m: Some(h_accuracy),
v_accuracy_m: Some(v_accuracy),
distance_m: Some(dist),
flags: baseline_ned_fields.flags,
num_sats: baseline_ned_fields.n_sats,
}) {
eprintln!("Unable to to write to baseline log, error {}.", err);
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion console_backend/src/bin/fft_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use clap::Clap;
use crossbeam::{channel, scope};
use sbp::{messages::piksi::MsgSpecan, sbp_tools::SBPTools};
Expand Down Expand Up @@ -60,7 +61,10 @@ fn main() -> Result<()> {
println!("Writing to file: {}", &filename);
let channel = opts.channel;
let num_ffts = opts.num_ffts;
let (rdr, _) = opts.into_conn().try_connect(/*shared_state=*/ None)?;
let (rdr, _) = opts
.into_conn()
.try_connect(/*shared_state=*/ None)
.context("while connecting")?;

scope(|s| {
s.spawn(|_| run(rdr));
Expand Down
16 changes: 8 additions & 8 deletions console_backend/src/bin/fileio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crossbeam::{channel, scope};
use sbp::sbp_tools::SBPTools;

use console_backend::{
broadcaster::Broadcaster,
broadcaster::Link,
cli_options::Input,
fileio::Fileio,
types::{MsgSender, Result},
Expand Down Expand Up @@ -49,15 +49,15 @@ pub enum Opts {
}

fn main() -> Result<()> {
let bc = Broadcaster::new();
let link = Link::new();
let link_source = link.clone();

let (done_tx, done_rx) = channel::bounded(0);

let bc_source = bc.clone();
let run = move |rdr| {
let messages = sbp::iter_messages(rdr).log_errors(log::Level::Debug);
for msg in messages {
bc_source.send(&msg, None);
link_source.send(&msg, None);
if done_rx.try_recv().is_ok() {
break;
}
Expand All @@ -74,7 +74,7 @@ fn main() -> Result<()> {
let sender = MsgSender::new(wtr);
scope(|s| {
s.spawn(|_| run(rdr));
let mut fileio = Fileio::new(bc, sender);
let mut fileio = Fileio::new(link, sender);
let data = fs::File::open(source)?;
fileio.overwrite(dest, data)?;
eprintln!("file written successfully.");
Expand All @@ -92,7 +92,7 @@ fn main() -> Result<()> {
let sender = MsgSender::new(wtr);
scope(|s| {
s.spawn(|_| run(rdr));
let mut fileio = Fileio::new(bc, sender);
let mut fileio = Fileio::new(link, sender);
let dest: Box<dyn Write> = match dest {
Some(path) => Box::new(fs::File::create(path)?),
None => Box::new(io::stdout()),
Expand All @@ -108,7 +108,7 @@ fn main() -> Result<()> {
let sender = MsgSender::new(wtr);
scope(|s| {
s.spawn(|_| run(rdr));
let mut fileio = Fileio::new(bc, sender);
let mut fileio = Fileio::new(link, sender);
let files = fileio.readdir(path)?;
eprintln!("{:#?}", files);
done_tx.send(true).unwrap();
Expand All @@ -121,7 +121,7 @@ fn main() -> Result<()> {
let sender = MsgSender::new(wtr);
scope(|s| {
s.spawn(|_| run(rdr));
let fileio = Fileio::new(bc, sender);
let fileio = Fileio::new(link, sender);
fileio.remove(path)?;
eprintln!("file deleted.");
done_tx.send(true).unwrap();
Expand Down
134 changes: 133 additions & 1 deletion console_backend/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sbp::{
messages::{ConcreteMessage, SBPMessage, SBP},
time::{GpsTime, GpsTimeError},
};
use slotmap::HopSlotMap;
use slotmap::{DenseSlotMap, HopSlotMap};

type MaybeGpsTime = Option<Result<GpsTime, GpsTimeError>>;

Expand Down Expand Up @@ -164,6 +164,117 @@ where
}
}

pub trait Handler<Event, Kind> {
fn run(&mut self, event: Event, time: MaybeGpsTime);
}

pub struct WithTime;

impl<F, E> Handler<E, WithTime> for F
where
F: FnMut(E, MaybeGpsTime),
E: Event,
{
fn run(&mut self, event: E, time: MaybeGpsTime) {
(self)(event, time)
}
}

pub struct WithoutTime;

impl<F, E> Handler<E, WithoutTime> for F
where
F: FnMut(E),
E: Event,
{
fn run(&mut self, event: E, _time: MaybeGpsTime) {
(self)(event)
}
}

pub struct OnlyTime;

impl<F, E> Handler<E, OnlyTime> for F
where
F: FnMut(GpsTime),
E: Event,
{
fn run(&mut self, _event: E, time: MaybeGpsTime) {
match time {
Some(Ok(time)) => (self)(time),
_ => {}
}
}
}

struct Callback<'a> {
func: Box<dyn FnMut(SBP, MaybeGpsTime) + Send + 'a>,
msg_types: &'static [u16],
}

pub struct Link<'a> {
callbacks: Arc<Mutex<DenseSlotMap<KeyInner, Callback<'a>>>>,
}

impl<'a> Link<'a> {
const CB_LOCK_FAILURE: &'static str = "failed to aquire lock on callbacks";

pub fn new() -> Self {
Self {
callbacks: Arc::new(Mutex::new(DenseSlotMap::with_key())),
}
}

pub fn send(&self, message: &SBP, gps_time: MaybeGpsTime) -> bool {
let msg_type = message.get_message_type();
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
let to_call = cbs
.values_mut()
.filter(|cb| cb.msg_types.is_empty() || cb.msg_types.iter().any(|ty| ty == &msg_type));
let mut called = false;
for cb in to_call {
(cb.func)(message.clone(), gps_time.clone());
called = true;
}
called
}

pub fn register_cb<H, E, K>(&mut self, mut handler: H) -> Key
where
H: Handler<E, K> + Send + 'a,
E: Event,
{
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
let inner = cbs.insert(Callback {
func: Box::new(move |msg, time| {
let event = E::from_sbp(msg);
handler.run(event, time)
}),
msg_types: E::MESSAGE_TYPES,
});
Key { inner }
}

pub fn unregister_cb(&self, key: Key) {
let mut cbs = self.callbacks.lock().expect(Self::CB_LOCK_FAILURE);
cbs.remove(key.inner);
}
}

impl Clone for Link<'_> {
fn clone(&self) -> Self {
Self {
callbacks: Arc::clone(&self.callbacks),
}
}
}

impl Default for Link<'_> {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use crossbeam::scope;
Expand All @@ -174,6 +285,27 @@ mod tests {

use super::*;

struct Counter {
count: usize,
}

impl Counter {
fn obs(&mut self, _: MsgObs) {
self.count += 1;
}
}

#[test]
fn test_dispatcher() {
let mut d = Link::new();
let mut c = Counter { count: 0 };
d.register_cb(|obs| c.obs(obs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super cool to be able to register a callback based on the type of the function parameter!

d.send(&make_msg_obs(), None);
d.send(&make_msg_obs_dep_a(), None);
drop(d);
assert_eq!(c.count, 1);
}

#[test]
fn test_broadcaster() {
let b = Broadcaster::new();
Expand Down
16 changes: 5 additions & 11 deletions console_backend/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::constants::*;
use crate::errors::*;
use crate::process_messages::process_messages;
use crate::types::*;
use chrono::{DateTime, Utc};
use anyhow::anyhow;
use crossbeam::channel::{unbounded, Receiver, Sender};
use log::{error, info};
use std::{
Expand All @@ -16,10 +16,6 @@ use std::{
time::Duration,
};

pub type Error = std::boxed::Box<dyn std::error::Error>;
pub type Result<T> = std::result::Result<T, Error>;
pub type UtcDateTime = DateTime<Utc>;

#[derive(Clone)]
pub struct TcpConnection {
name: String,
Expand All @@ -33,13 +29,11 @@ impl TcpConnection {
}
fn socket_addrs(name: String) -> Result<SocketAddr> {
let socket = &mut name.to_socket_addrs()?;
let socket = if let Some(socket_) = socket.next() {
socket_
if let Some(s) = socket.next() {
Ok(s)
} else {
let e: Box<dyn std::error::Error> = String::from(TCP_CONNECTION_PARSING_FAILURE).into();
return Err(e);
};
Ok(socket)
Err(anyhow!("{}", TCP_CONNECTION_PARSING_FAILURE))
}
}
fn name(&self) -> String {
self.name.clone()
Expand Down
3 changes: 2 additions & 1 deletion console_backend/src/fft_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::constants::{AMPLITUDES, CHANNELS, FREQUENCIES, SIGNALS_TOTAL};
use crate::types::{Result, Specan};
use anyhow::bail;
use serde::Serialize;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -111,7 +112,7 @@ impl FftMonitor {
if let Some(freqs) = fft.get(FREQUENCIES) {
if let Some(amps) = fft.get(AMPLITUDES) {
if freqs.len() != amps.len() {
return Err(format!("Frequencies length does not match amplitudes length for {:?}", gps_time).into());
bail!("Frequencies length does not match amplitudes length for {:?}", gps_time);
}
if let Some(chan_ffts) = self.ffts.get_mut(&channel) {
chan_ffts.append(&mut vec![fft]);
Expand Down
Loading