Skip to content

Commit 0c3914e

Browse files
Add joining to increase reliability when closing.
1 parent 9ff8acd commit 0c3914e

File tree

4 files changed

+88
-21
lines changed

4 files changed

+88
-21
lines changed

console_backend/src/constants.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ pub(crate) const APPLICATION_ORGANIZATION: &str = "swift-nav";
88
pub(crate) const APPLICATION_NAME: &str = "swift_navigation_console";
99
// CLI constants.
1010

11+
// Server constants.
12+
#[allow(dead_code)]
13+
pub(crate) const FETCH_MESSAGE_TIMEOUT_MS: u64 = 1;
14+
1115
// Process Message constants.
1216
pub(crate) const PAUSE_LOOP_SLEEP_DURATION_MS: u64 = 100;
1317

console_backend/src/process_messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ where
202202
}
203203
if conn.close_when_done() {
204204
shared_state.set_running(false, client_send.clone());
205+
shared_state.stop_server_running();
205206
close_frontend(&mut client_send);
206207
}
207208
Ok(())

console_backend/src/server.rs

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,32 +8,36 @@ use async_logger_log::Logger;
88
use log::error;
99
use std::{
1010
io::{BufReader, Cursor},
11+
ops::Drop,
1112
path::PathBuf,
1213
str::FromStr,
1314
sync::mpsc,
1415
thread,
16+
time::Duration,
1517
};
1618

1719
use crate::cli_options::*;
20+
use crate::common_constants::ApplicationStates;
1821
use crate::connection::ConnectionState;
1922
use crate::console_backend_capnp as m;
20-
use crate::constants::LOG_WRITER_BUFFER_MESSAGE_COUNT;
23+
use crate::constants::{FETCH_MESSAGE_TIMEOUT_MS, LOG_WRITER_BUFFER_MESSAGE_COUNT};
2124
use crate::errors::*;
2225
use crate::log_panel::{splitable_log_formatter, LogLevel, LogPanelWriter};
2326
use crate::output::{CsvLogging, SbpLogging};
24-
use crate::types::{ClientSender, FlowControl, RealtimeDelay, SharedState};
27+
use crate::types::{ArcBool, ClientSender, FlowControl, RealtimeDelay, SharedState};
2528
use crate::utils::{refresh_loggingbar, refresh_navbar};
2629

27-
/// The backend server
28-
#[pyclass]
29-
struct Server {
30-
client_recv: Option<mpsc::Receiver<Vec<u8>>>,
31-
}
32-
3330
#[pyclass]
3431
struct ServerEndpoint {
3532
server_send: Option<mpsc::Sender<Vec<u8>>>,
3633
}
34+
impl Drop for ServerEndpoint {
35+
fn drop(&mut self) {
36+
if let Some(sender) = self.server_send.take() {
37+
drop(sender);
38+
}
39+
}
40+
}
3741

3842
#[pymethods]
3943
impl ServerEndpoint {
@@ -101,23 +105,44 @@ fn handle_cli(opt: CliOptions, connection_state: &ConnectionState, shared_state:
101105
}
102106
}
103107

108+
/// The backend server
109+
#[pyclass]
110+
struct Server {
111+
client_recv: Option<mpsc::Receiver<Vec<u8>>>,
112+
is_running: ArcBool,
113+
handle: Option<thread::JoinHandle<()>>,
114+
}
115+
impl Drop for Server {
116+
fn drop(&mut self) {
117+
self.is_running.set(false);
118+
119+
if let Some(recv) = self.client_recv.take() {
120+
drop(recv);
121+
}
122+
}
123+
}
124+
104125
#[pymethods]
105126
impl Server {
106127
#[new]
107128
pub fn __new__() -> Self {
108-
Server { client_recv: None }
129+
Server {
130+
client_recv: None,
131+
is_running: ArcBool::new(),
132+
handle: None,
133+
}
109134
}
110135

111136
#[text_signature = "($self, /)"]
112137
pub fn fetch_message(&mut self, py: Python) -> Option<PyObject> {
113138
let result = py.allow_threads(move || {
114139
let client_recv = self.client_recv.as_ref();
115140
if let Some(client_recv) = client_recv {
116-
let buf = client_recv.recv();
117-
if let Ok(buf) = buf {
141+
if let Ok(buf) =
142+
client_recv.recv_timeout(Duration::from_millis(FETCH_MESSAGE_TIMEOUT_MS))
143+
{
118144
Some(buf)
119145
} else {
120-
println!("error receiving message: {:?}", buf);
121146
None
122147
}
123148
} else {
@@ -156,7 +181,12 @@ impl Server {
156181
handle_cli(opt, &connection_state, shared_state.clone());
157182
refresh_navbar(&mut client_send.clone(), shared_state.clone());
158183
refresh_loggingbar(&mut client_send.clone(), shared_state.clone());
159-
thread::spawn(move || loop {
184+
self.is_running.set(true);
185+
let is_running = self.is_running.clone();
186+
self.handle = Some(thread::spawn(move || loop {
187+
if !is_running.get() {
188+
break;
189+
}
160190
let buf = server_recv.recv();
161191
if let Ok(buf) = buf {
162192
let mut buf_reader = BufReader::new(Cursor::new(buf));
@@ -176,6 +206,19 @@ impl Server {
176206
}
177207
};
178208
match message {
209+
m::message::Status(Ok(cv_in)) => {
210+
let status_req =
211+
cv_in.get_text().expect(CAP_N_PROTO_DESERIALIZATION_FAILURE);
212+
match ApplicationStates::from_str(status_req) {
213+
Ok(ApplicationStates::CLOSE) => {
214+
let shared_state_clone = shared_state.clone();
215+
shared_state_clone.stop_server_running();
216+
is_running.set(false);
217+
}
218+
Ok(_) => {}
219+
Err(_) => {}
220+
}
221+
}
179222
m::message::ConnectRequest(Ok(conn_req)) => {
180223
let request = conn_req
181224
.get_request()
@@ -328,7 +371,8 @@ impl Server {
328371
println!("error: {:?}", buf);
329372
break;
330373
}
331-
});
374+
}
375+
));
332376
Ok(server_endpoint)
333377
}
334378
}

src/main/python/main.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
PIKSI_HOST = "piksi-relay-bb9f2b10e53143f4a816a11884e679cf.ce.swiftnav.com"
121121
PIKSI_PORT = 55555
122122

123+
IS_RUNNING = True
123124

124125
MAIN_INDEX = "MAIN_INDEX"
125126
SUB_INDEX = "SUB_INDEX"
@@ -180,13 +181,15 @@
180181

181182

182183
def receive_messages(app_, backend, messages):
183-
while True:
184+
while IS_RUNNING:
184185
buffer = backend.fetch_message()
186+
if buffer is None:
187+
continue
185188
Message = messages.Message
186189
m = Message.from_bytes(buffer)
187190
if m.which == Message.Union.Status:
188191
if m.status.text == ApplicationStates.CLOSE:
189-
return app_.quit()
192+
break
190193
if m.status.text == ApplicationStates.CONNECTED:
191194
NAV_BAR[Keys.CONNECTED] = True
192195
elif m.status.text == ApplicationStates.DISCONNECTED:
@@ -298,7 +301,8 @@ def receive_messages(app_, backend, messages):
298301
log_panel_lock.unlock()
299302
else:
300303
pass
301-
304+
305+
app_.quit()
302306

303307
class DataModel(QObject):
304308

@@ -310,6 +314,14 @@ def __init__(self, endpoint, messages):
310314
self.endpoint = endpoint
311315
self.messages = messages
312316

317+
def status(self, status: ApplicationStates):
318+
Message = self.messages.Message
319+
msg = Message()
320+
msg.status = msg.init(Message.Union.Status)
321+
msg.status.text = str(status)
322+
buffer = msg.to_bytes()
323+
self.endpoint.send_message(buffer)
324+
313325
@Slot() # type: ignore
314326
def connect(self) -> None:
315327
self.connect_tcp(PIKSI_HOST, PIKSI_PORT)
@@ -584,14 +596,20 @@ def handle_cli_arguments(args: argparse.Namespace, globals_: QObject):
584596
globals_main = globals_main.property("globals") # type: ignore
585597

586598
handle_cli_arguments(args_main, globals_main)
587-
588-
threading.Thread(
599+
running_lock = threading.Lock()
600+
recv_thread = threading.Thread(
589601
target=receive_messages,
590602
args=(
591603
app,
592604
backend_main,
593605
messages_main,
594606
),
595607
daemon=True,
596-
).start()
597-
sys.exit(app.exec_())
608+
)
609+
recv_thread.start()
610+
app.exec_()
611+
data_model.status(ApplicationStates.CLOSE)
612+
with running_lock:
613+
IS_RUNNING = False
614+
recv_thread.join()
615+
sys.exit()

0 commit comments

Comments
 (0)