-
Couldn't load subscription status.
- Fork 2
Add joining to increase reliability when closing[CPP-237]. #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,32 +8,36 @@ use async_logger_log::Logger; | |
| use log::error; | ||
| use std::{ | ||
| io::{BufReader, Cursor}, | ||
| ops::Drop, | ||
| path::PathBuf, | ||
| str::FromStr, | ||
| sync::mpsc, | ||
| thread, | ||
| time::Duration, | ||
| }; | ||
|
|
||
| use crate::cli_options::*; | ||
| use crate::common_constants::ApplicationStates; | ||
| use crate::connection::ConnectionState; | ||
| use crate::console_backend_capnp as m; | ||
| use crate::constants::LOG_WRITER_BUFFER_MESSAGE_COUNT; | ||
| use crate::constants::{FETCH_MESSAGE_TIMEOUT_MS, LOG_WRITER_BUFFER_MESSAGE_COUNT}; | ||
| use crate::errors::*; | ||
| use crate::log_panel::{splitable_log_formatter, LogLevel, LogPanelWriter}; | ||
| use crate::output::{CsvLogging, SbpLogging}; | ||
| use crate::types::{ClientSender, FlowControl, RealtimeDelay, SharedState}; | ||
| use crate::types::{ArcBool, ClientSender, FlowControl, RealtimeDelay, SharedState}; | ||
| use crate::utils::{refresh_loggingbar, refresh_navbar}; | ||
|
|
||
| /// The backend server | ||
| #[pyclass] | ||
| struct Server { | ||
| client_recv: Option<mpsc::Receiver<Vec<u8>>>, | ||
| } | ||
|
|
||
| #[pyclass] | ||
| struct ServerEndpoint { | ||
| server_send: Option<mpsc::Sender<Vec<u8>>>, | ||
| } | ||
| impl Drop for ServerEndpoint { | ||
| fn drop(&mut self) { | ||
| if let Some(sender) = self.server_send.take() { | ||
| drop(sender); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be in a helper function exposed to Python, so that Python can indicate that it's done with the server |
||
| } | ||
| } | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl ServerEndpoint { | ||
|
|
@@ -101,23 +105,44 @@ fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state: | |
| } | ||
| } | ||
|
|
||
| /// The backend server | ||
| #[pyclass] | ||
| struct Server { | ||
| client_recv: Option<mpsc::Receiver<Vec<u8>>>, | ||
| is_running: ArcBool, | ||
| handle: Option<thread::JoinHandle<()>>, | ||
| } | ||
| impl Drop for Server { | ||
| fn drop(&mut self) { | ||
| self.is_running.set(false); | ||
|
|
||
| if let Some(recv) = self.client_recv.take() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to explicitly drop the receiver, the close of the sender should be reflected as an error when we attempt to receive, then we can break of the receive loop on that error condition |
||
| drop(recv); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also had a join here for the thread spawned in "fetch_message" but it causes an infinite hang on the Macos frontend benchmark whenever we try to kill the app although it works fine with linux/windows 😢 . Nonetheless I think I am seeing less crashes without the join. I am setting things such that the thread should end but definitely not ideal without the join. |
||
| } | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl Server { | ||
| #[new] | ||
| pub fn __new__() -> Self { | ||
| Server { client_recv: None } | ||
| Server { | ||
| client_recv: None, | ||
| is_running: ArcBool::new(), | ||
| handle: None, | ||
| } | ||
| } | ||
|
|
||
| #[text_signature = "($self, /)"] | ||
| pub fn fetch_message(&mut self, py: Python) -> Option<PyObject> { | ||
| let result = py.allow_threads(move || { | ||
| let client_recv = self.client_recv.as_ref(); | ||
| if let Some(client_recv) = client_recv { | ||
| let buf = client_recv.recv(); | ||
| if let Ok(buf) = buf { | ||
| if let Ok(buf) = | ||
| client_recv.recv_timeout(Duration::from_millis(FETCH_MESSAGE_TIMEOUT_MS)) | ||
| { | ||
| Some(buf) | ||
| } else { | ||
| println!("error receiving message: {:?}", buf); | ||
| None | ||
| } | ||
| } else { | ||
|
|
@@ -156,7 +181,12 @@ impl Server { | |
| handle_cli(opt, &connection_state, shared_state.clone()); | ||
| refresh_navbar(&mut client_send.clone(), shared_state.clone()); | ||
| refresh_loggingbar(&mut client_send.clone(), shared_state.clone()); | ||
| thread::spawn(move || loop { | ||
| self.is_running.set(true); | ||
| let is_running = self.is_running.clone(); | ||
| self.handle = Some(thread::spawn(move || loop { | ||
| if !is_running.get() { | ||
| break; | ||
| } | ||
| let buf = server_recv.recv(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think all you need to do is add a timeout here, and then move the |
||
| if let Ok(buf) = buf { | ||
| let mut buf_reader = BufReader::new(Cursor::new(buf)); | ||
|
|
@@ -176,6 +206,19 @@ impl Server { | |
| } | ||
| }; | ||
| match message { | ||
| m::message::Status(Ok(cv_in)) => { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't be necessary if we just close the sender as mentioned above |
||
| let status_req = | ||
| cv_in.get_text().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE); | ||
| match ApplicationStates::from_str(status_req) { | ||
| Ok(ApplicationStates::CLOSE) => { | ||
| let shared_state_clone = shared_state.clone(); | ||
| shared_state_clone.stop_server_running(); | ||
| is_running.set(false); | ||
| } | ||
| Ok(_) => {} | ||
| Err(_) => {} | ||
| } | ||
| } | ||
| m::message::ConnectRequest(Ok(conn_req)) => { | ||
| let request = conn_req | ||
| .get_request() | ||
|
|
@@ -328,7 +371,7 @@ impl Server { | |
| println!("error: {:?}", buf); | ||
| break; | ||
| } | ||
| }); | ||
| })); | ||
| Ok(server_endpoint) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -120,6 +120,7 @@ | |
| PIKSI_HOST = "piksi-relay-bb9f2b10e53143f4a816a11884e679cf.ce.swiftnav.com" | ||
| PIKSI_PORT = 55555 | ||
|
|
||
| IS_RUNNING = True | ||
|
|
||
| MAIN_INDEX = "MAIN_INDEX" | ||
| SUB_INDEX = "SUB_INDEX" | ||
|
|
@@ -180,13 +181,15 @@ | |
|
|
||
|
|
||
| def receive_messages(app_, backend, messages): | ||
| while True: | ||
| while IS_RUNNING: | ||
| buffer = backend.fetch_message() | ||
| if buffer is None: | ||
| continue | ||
| Message = messages.Message | ||
| m = Message.from_bytes(buffer) | ||
| if m.which == Message.Union.Status: | ||
| if m.status.text == ApplicationStates.CLOSE: | ||
| return app_.quit() | ||
| break | ||
| if m.status.text == ApplicationStates.CONNECTED: | ||
| NAV_BAR[Keys.CONNECTED] = True | ||
| elif m.status.text == ApplicationStates.DISCONNECTED: | ||
|
|
@@ -299,6 +302,8 @@ def receive_messages(app_, backend, messages): | |
| else: | ||
| pass | ||
|
|
||
| app_.quit() | ||
|
|
||
|
|
||
| class DataModel(QObject): | ||
|
|
||
|
|
@@ -310,6 +315,14 @@ def __init__(self, endpoint, messages): | |
| self.endpoint = endpoint | ||
| self.messages = messages | ||
|
|
||
| def status(self, status: ApplicationStates): | ||
| Message = self.messages.Message | ||
| msg = Message() | ||
| msg.status = msg.init(Message.Union.Status) | ||
| msg.status.text = str(status) | ||
| buffer = msg.to_bytes() | ||
| self.endpoint.send_message(buffer) | ||
|
|
||
| @Slot() # type: ignore | ||
| def connect(self) -> None: | ||
| self.connect_tcp(PIKSI_HOST, PIKSI_PORT) | ||
|
|
@@ -584,14 +597,20 @@ def handle_cli_arguments(args: argparse.Namespace, globals_: QObject): | |
| globals_main = globals_main.property("globals") # type: ignore | ||
|
|
||
| handle_cli_arguments(args_main, globals_main) | ||
|
|
||
| threading.Thread( | ||
| running_lock = threading.Lock() | ||
| recv_thread = threading.Thread( | ||
| target=receive_messages, | ||
| args=( | ||
| app, | ||
| backend_main, | ||
| messages_main, | ||
| ), | ||
| daemon=True, | ||
| ).start() | ||
| sys.exit(app.exec_()) | ||
| ) | ||
| recv_thread.start() | ||
| app.exec_() | ||
| data_model.status(ApplicationStates.CLOSE) | ||
| with running_lock: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be able to call something like |
||
| IS_RUNNING = False | ||
| recv_thread.join() | ||
| sys.exit() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need any explicit drop implementations on the Server/SeverEndpoint type because fundamentally as long as the Server object is held by the Python code, this drop implementation isn't going to run. It order for drop to run, the Python code would need to explicitly detect the object, a better option is to just request termination, and then to do a blocking wait for termination to occur.