Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions console_backend/benches/cpu_benches.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#![allow(unused_imports)]
use criterion::{criterion_group, criterion_main, Criterion};
use crossbeam::channel;
use glob::glob;
use sbp::sbp_tools::SBPTools;
use std::{
fs, io,
path::Path,
sync::{mpsc, Arc, Mutex},
sync::{Arc, Mutex},
thread, time,
};

Expand Down Expand Up @@ -36,8 +37,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
}

fn run_process_messages(file_in_name: &str, failure: bool) {
use std::sync::mpsc::Receiver;
let (client_recv_tx, client_recv_rx) = mpsc::channel::<Receiver<Vec<u8>>>();
let (client_recv_tx, client_recv_rx) = channel::unbounded::<channel::Receiver<Vec<u8>>>();
let recv_thread = thread::spawn(move || {
let client_recv = client_recv_rx.recv().unwrap();
let mut iter_count = 0;
Expand All @@ -50,7 +50,7 @@ fn run_process_messages(file_in_name: &str, failure: bool) {
assert!(iter_count > 0);
});
{
let (client_send_, client_recv) = mpsc::channel::<Vec<u8>>();
let (client_send_, client_recv) = channel::unbounded::<Vec<u8>>();
client_recv_tx
.send(client_recv)
.expect("sending client recv handle should succeed");
Expand Down
10 changes: 5 additions & 5 deletions console_backend/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,10 @@ impl Drop for ConnectionState {
mod tests {
use super::*;
use crate::test_common::{backup_file, filename, restore_backup_file};
use crossbeam::channel;
use serial_test::serial;
use std::{
str::FromStr,
sync::mpsc,
thread::sleep,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -363,7 +363,7 @@ mod tests {
assert_eq!(conn.realtime_delay(), RealtimeDelay::Off);
}

fn receive_thread(client_recv: mpsc::Receiver<Vec<u8>>) -> JoinHandle<()> {
fn receive_thread(client_recv: channel::Receiver<Vec<u8>>) -> JoinHandle<()> {
thread::spawn(move || {
let mut iter_count = 0;

Expand All @@ -384,7 +384,7 @@ mod tests {
let bfilename = filename();
backup_file(bfilename.clone());
let shared_state = SharedState::new();
let (client_send_, client_receive) = mpsc::channel::<Vec<u8>>();
let (client_send_, client_receive) = channel::unbounded::<Vec<u8>>();
let client_send = ClientSender::new(client_send_);
let connection_state = ConnectionState::new(client_send, shared_state.clone());
let filename = TEST_SHORT_FILEPATH.to_string();
Expand All @@ -411,7 +411,7 @@ mod tests {
let bfilename = filename();
backup_file(bfilename.clone());
let shared_state = SharedState::new();
let (client_send_, client_receive) = mpsc::channel::<Vec<u8>>();
let (client_send_, client_receive) = channel::unbounded::<Vec<u8>>();
let client_send = ClientSender::new(client_send_);
let connection_state = ConnectionState::new(client_send, shared_state.clone());
let filename = TEST_SHORT_FILEPATH.to_string();
Expand Down Expand Up @@ -442,7 +442,7 @@ mod tests {
let bfilename = filename();
backup_file(bfilename.clone());
let shared_state = SharedState::new();
let (client_send_, client_receive) = mpsc::channel::<Vec<u8>>();
let (client_send_, client_receive) = channel::unbounded::<Vec<u8>>();
let client_send = ClientSender::new(client_send_);
let connection_state = ConnectionState::new(client_send.clone(), shared_state.clone());
let filename = TEST_FILEPATH.to_string();
Expand Down
15 changes: 8 additions & 7 deletions console_backend/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use capnp::serialize;
use crossbeam::channel;
use log::{error, info};
use pyo3::exceptions;
use pyo3::prelude::*;
Expand All @@ -8,13 +9,13 @@ use std::{
io::{BufReader, Cursor},
path::PathBuf,
str::FromStr,
sync::mpsc,
thread, time,
};

use crate::cli_options::*;
use crate::connection::ConnectionState;
use crate::console_backend_capnp as m;

use crate::errors::*;
use crate::log_panel::{setup_logging, LogLevel};
use crate::output::{CsvLogging, SbpLogging};
Expand All @@ -24,13 +25,13 @@ use crate::utils::{refresh_loggingbar, refresh_navbar};
/// The backend server
#[pyclass]
struct Server {
client_recv: Option<mpsc::Receiver<Vec<u8>>>,
client_recv: Option<channel::Receiver<Vec<u8>>>,
client_sender: Option<ClientSender>,
}

#[pyclass]
struct ServerEndpoint {
server_send: Option<mpsc::Sender<Vec<u8>>>,
server_send: Option<channel::Sender<Vec<u8>>>,
}

#[pymethods]
Expand Down Expand Up @@ -115,7 +116,7 @@ fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state:
fn backend_recv_thread(
connection_state: ConnectionState,
client_send: ClientSender,
server_recv: mpsc::Receiver<Vec<u8>>,
server_recv: channel::Receiver<Vec<u8>>,
shared_state: SharedState,
) {
thread::spawn(move || {
Expand Down Expand Up @@ -303,7 +304,7 @@ impl Server {
match client_recv.recv_timeout(time::Duration::from_millis(1)) {
Ok(buf) => break Some(buf),
Err(err) => {
use std::sync::mpsc::RecvTimeoutError;
use channel::RecvTimeoutError;
if matches!(err, RecvTimeoutError::Timeout) {
if self.client_sender.as_ref().unwrap().connected.get() {
continue;
Expand All @@ -327,8 +328,8 @@ impl Server {

#[text_signature = "($self, /)"]
pub fn start(&mut self) -> PyResult<ServerEndpoint> {
let (client_send_, client_recv) = mpsc::channel::<Vec<u8>>();
let (server_send, server_recv) = mpsc::channel::<Vec<u8>>();
let (client_send_, client_recv) = channel::unbounded::<Vec<u8>>();
let (server_send, server_recv) = channel::unbounded::<Vec<u8>>();
let client_send = ClientSender::new(client_send_);
self.client_recv = Some(client_recv);
self.client_sender = Some(client_send.clone());
Expand Down
6 changes: 3 additions & 3 deletions console_backend/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::piksi_tools_constants::*;
use crate::utils::{mm_to_m, ms_to_sec, set_connected_frontend};
use anyhow::{Context, Result as AHResult};
use chrono::{DateTime, Utc};
use crossbeam::channel;
use directories::{ProjectDirs, UserDirs};
use indexmap::set::IndexSet;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -44,7 +45,6 @@ use std::{
path::PathBuf,
str::FromStr,
sync::{
self,
atomic::{AtomicBool, Ordering::*},
Arc, Mutex,
},
Expand Down Expand Up @@ -126,11 +126,11 @@ pub trait CapnProtoSender: Debug + Clone + Send {

#[derive(Debug, Clone)]
pub struct ClientSender {
pub inner: sync::mpsc::Sender<Vec<u8>>,
pub inner: channel::Sender<Vec<u8>>,
pub connected: ArcBool,
}
impl ClientSender {
pub fn new(inner: sync::mpsc::Sender<Vec<u8>>) -> Self {
pub fn new(inner: channel::Sender<Vec<u8>>) -> Self {
Self {
inner,
connected: ArcBool::new_with(true),
Expand Down
9 changes: 4 additions & 5 deletions console_backend/tests/mem_benches.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#[cfg(feature = "benches")]
mod mem_bench_impl {

use crossbeam::channel;
use ndarray::{ArrayView, Axis, Dim};
use std::{
error::Error,
result::Result,
sync::{mpsc, Arc, Mutex},
sync::{Arc, Mutex},
thread,
};

Expand Down Expand Up @@ -49,8 +49,7 @@ mod mem_bench_impl {
/// - mean - std >= absolute_mean
#[test]
fn test_run_process_messages() {
use std::sync::mpsc::Receiver;
let (client_recv_tx, client_recv_rx) = mpsc::channel::<Receiver<Vec<u8>>>();
let (client_recv_tx, client_recv_rx) = channel::unbounded::<channel::Receiver<Vec<u8>>>();
let pid = get_current_pid().unwrap();
println!("PID: {}", pid);
let is_running = Arc::new(Mutex::new(true));
Expand Down Expand Up @@ -89,7 +88,7 @@ mod mem_bench_impl {
});

{
let (client_send_, client_recv) = mpsc::channel::<Vec<u8>>();
let (client_send_, client_recv) = channel::unbounded::<Vec<u8>>();
client_recv_tx
.send(client_recv)
.expect("sending client recv handle should succeed");
Expand Down