Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions console_backend/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ pub(crate) const APPLICATION_ORGANIZATION: &str = "swift-nav";
pub(crate) const APPLICATION_NAME: &str = "swift_navigation_console";
// CLI constants.

// Server constants.
#[allow(dead_code)]
pub(crate) const FETCH_MESSAGE_TIMEOUT_MS: u64 = 1;

// Process Message constants.
pub(crate) const PAUSE_LOOP_SLEEP_DURATION_MS: u64 = 100;

Expand Down
1 change: 1 addition & 0 deletions console_backend/src/process_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ where
}
if conn.close_when_done() {
shared_state.set_running(false, client_send.clone());
shared_state.stop_server_running();
close_frontend(&mut client_send);
}
Ok(())
Expand Down
71 changes: 57 additions & 14 deletions console_backend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,36 @@ use async_logger_log::Logger;
use log::error;
use std::{
io::{BufReader, Cursor},
ops::Drop,
path::PathBuf,
str::FromStr,
sync::mpsc,
thread,
time::Duration,
};

use crate::cli_options::*;
use crate::common_constants::ApplicationStates;
use crate::connection::ConnectionState;
use crate::console_backend_capnp as m;
use crate::constants::LOG_WRITER_BUFFER_MESSAGE_COUNT;
use crate::constants::{FETCH_MESSAGE_TIMEOUT_MS, LOG_WRITER_BUFFER_MESSAGE_COUNT};
use crate::errors::*;
use crate::log_panel::{splitable_log_formatter, LogLevel, LogPanelWriter};
use crate::output::{CsvLogging, SbpLogging};
use crate::types::{ClientSender, FlowControl, RealtimeDelay, SharedState};
use crate::types::{ArcBool, ClientSender, FlowControl, RealtimeDelay, SharedState};
use crate::utils::{refresh_loggingbar, refresh_navbar};

/// The backend server
#[pyclass]
struct Server {
client_recv: Option<mpsc::Receiver<Vec<u8>>>,
}

#[pyclass]
struct ServerEndpoint {
server_send: Option<mpsc::Sender<Vec<u8>>>,
}
impl Drop for ServerEndpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need any explicit drop implementations on the Server/SeverEndpoint type because fundamentally as long as the Server object is held by the Python code, this drop implementation isn't going to run. It order for drop to run, the Python code would need to explicitly detect the object, a better option is to just request termination, and then to do a blocking wait for termination to occur.

fn drop(&mut self) {
if let Some(sender) = self.server_send.take() {
drop(sender);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a helper function exposed to Python, so that Python can indicate that it's done with the server

}
}
}

#[pymethods]
impl ServerEndpoint {
Expand Down Expand Up @@ -101,23 +105,44 @@ fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state:
}
}

/// The backend server
#[pyclass]
struct Server {
client_recv: Option<mpsc::Receiver<Vec<u8>>>,
is_running: ArcBool,
handle: Option<thread::JoinHandle<()>>,
}
impl Drop for Server {
fn drop(&mut self) {
self.is_running.set(false);

if let Some(recv) = self.client_recv.take() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to explicitly drop the receiver, the close of the sender should be reflected as an error when we attempt to receive, then we can break of the receive loop on that error condition

drop(recv);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also had a join here for the thread spawned in "fetch_message" but it causes an infinite hang on the Macos frontend benchmark whenever we try to kill the app although it works fine with linux/windows 😢 . Nonetheless I think I am seeing less crashes without the join. I am setting things such that the thread should end but definitely not ideal without the join.

}
}

#[pymethods]
impl Server {
#[new]
pub fn __new__() -> Self {
Server { client_recv: None }
Server {
client_recv: None,
is_running: ArcBool::new(),
handle: None,
}
}

#[text_signature = "($self, /)"]
pub fn fetch_message(&mut self, py: Python) -> Option<PyObject> {
let result = py.allow_threads(move || {
let client_recv = self.client_recv.as_ref();
if let Some(client_recv) = client_recv {
let buf = client_recv.recv();
if let Ok(buf) = buf {
if let Ok(buf) =
client_recv.recv_timeout(Duration::from_millis(FETCH_MESSAGE_TIMEOUT_MS))
{
Some(buf)
} else {
println!("error receiving message: {:?}", buf);
None
}
} else {
Expand Down Expand Up @@ -156,7 +181,12 @@ impl Server {
handle_cli(opt, &connection_state, shared_state.clone());
refresh_navbar(&mut client_send.clone(), shared_state.clone());
refresh_loggingbar(&mut client_send.clone(), shared_state.clone());
thread::spawn(move || loop {
self.is_running.set(true);
let is_running = self.is_running.clone();
self.handle = Some(thread::spawn(move || loop {
if !is_running.get() {
break;
}
let buf = server_recv.recv();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all you need to do is add a timeout here, and then move the drop for ServerEndpoint into a method exposed to Python, no need for the is_running variable, just use the closure of the recv endpoint to signal that the thread should exit

if let Ok(buf) = buf {
let mut buf_reader = BufReader::new(Cursor::new(buf));
Expand All @@ -176,6 +206,19 @@ impl Server {
}
};
match message {
m::message::Status(Ok(cv_in)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary if we just close the sender as mentioned above

let status_req =
cv_in.get_text().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE);
match ApplicationStates::from_str(status_req) {
Ok(ApplicationStates::CLOSE) => {
let shared_state_clone = shared_state.clone();
shared_state_clone.stop_server_running();
is_running.set(false);
}
Ok(_) => {}
Err(_) => {}
}
}
m::message::ConnectRequest(Ok(conn_req)) => {
let request = conn_req
.get_request()
Expand Down Expand Up @@ -328,7 +371,7 @@ impl Server {
println!("error: {:?}", buf);
break;
}
});
}));
Ok(server_endpoint)
}
}
Expand Down
31 changes: 25 additions & 6 deletions src/main/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
PIKSI_HOST = "piksi-relay-bb9f2b10e53143f4a816a11884e679cf.ce.swiftnav.com"
PIKSI_PORT = 55555

IS_RUNNING = True

MAIN_INDEX = "MAIN_INDEX"
SUB_INDEX = "SUB_INDEX"
Expand Down Expand Up @@ -180,13 +181,15 @@


def receive_messages(app_, backend, messages):
while True:
while IS_RUNNING:
buffer = backend.fetch_message()
if buffer is None:
continue
Message = messages.Message
m = Message.from_bytes(buffer)
if m.which == Message.Union.Status:
if m.status.text == ApplicationStates.CLOSE:
return app_.quit()
break
if m.status.text == ApplicationStates.CONNECTED:
NAV_BAR[Keys.CONNECTED] = True
elif m.status.text == ApplicationStates.DISCONNECTED:
Expand Down Expand Up @@ -299,6 +302,8 @@ def receive_messages(app_, backend, messages):
else:
pass

app_.quit()


class DataModel(QObject):

Expand All @@ -310,6 +315,14 @@ def __init__(self, endpoint, messages):
self.endpoint = endpoint
self.messages = messages

def status(self, status: ApplicationStates):
Message = self.messages.Message
msg = Message()
msg.status = msg.init(Message.Union.Status)
msg.status.text = str(status)
buffer = msg.to_bytes()
self.endpoint.send_message(buffer)

@Slot() # type: ignore
def connect(self) -> None:
self.connect_tcp(PIKSI_HOST, PIKSI_PORT)
Expand Down Expand Up @@ -584,14 +597,20 @@ def handle_cli_arguments(args: argparse.Namespace, globals_: QObject):
globals_main = globals_main.property("globals") # type: ignore

handle_cli_arguments(args_main, globals_main)

threading.Thread(
running_lock = threading.Lock()
recv_thread = threading.Thread(
target=receive_messages,
args=(
app,
backend_main,
messages_main,
),
daemon=True,
).start()
sys.exit(app.exec_())
)
recv_thread.start()
app.exec_()
data_model.status(ApplicationStates.CLOSE)
with running_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be able to call something like messages_main.stop_server() then just call the recv_thread.join(), closing the sender end of the mpsc should cause the loop in fetch_messages to terminate

IS_RUNNING = False
recv_thread.join()
sys.exit()