Skip to content

Commit 1b59862

Browse files
Apply local settings timeouts, global read_all_settings timeout[CPP-693] (#487)
* Apply local settings timeouts, global read_all_settings timeout[CPP-693] * Context Receiver Logic -> Watched ConnectionState monitor handle thread. (#491) Co-authored-by: Steven Meyer <[email protected]>
1 parent b479199 commit 1b59862

File tree

5 files changed

+113
-14
lines changed

5 files changed

+113
-14
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console_backend/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ parking_lot = "0.12.0"
4141
regex = { version = "1.5.5" }
4242
rust-ini = "0.18.0"
4343
sbp = { version = "4.1.5", features = ["json", "link", "swiftnav"] }
44-
sbp-settings = "0.6.7"
44+
sbp-settings = "0.6.8"
4545
env_logger = { version = "0.9", optional = true }
4646
mimalloc = { version = "0.1", default-features = false }
4747
curl = { version = "0.4", features = ["ssl", "static-curl"] }

console_backend/src/connection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ fn process_messages_thd(
240240
if let Err(e) = res {
241241
error!("Connection error: {}", e);
242242
}
243-
if !matches!(shared_state.connection(), ConnectionState::Disconnected) {
243+
if !matches!(shared_state.connection(), ConnectionState::Disconnected)
244+
&& !shared_state.connection_dialog_visible()
245+
{
244246
manager_msg.send(ConnectionManagerMsg::Reconnect(conn));
245247
}
246248
}

console_backend/src/settings_tab.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::borrow::Cow;
22
use std::collections::HashMap;
33
use std::fs;
44
use std::path::Path;
5-
use std::time::Duration;
5+
use std::time::{Duration, Instant};
66

77
use anyhow::anyhow;
88
use capnp::message::Builder;
@@ -32,6 +32,8 @@ const ETHERNET_INTERFACE_MODE_SETTING_KEY: &str = "interface_mode";
3232
const NTRIP_SETTING_GROUP: &str = "ntrip";
3333
const NTRIP_ENABLE_SETTING_KEY: &str = "enable";
3434

35+
const SETTINGS_READ_WRITE_TIMEOUT: Duration = Duration::from_millis(1000);
36+
3537
lazy_static! {
3638
static ref RECOMMENDED_INS_SETTINGS: [(&'static str, &'static str, SettingValue); 4] = [
3739
("imu", "imu_raw_output", SettingValue::Boolean(true)),
@@ -334,7 +336,7 @@ impl SettingsTab {
334336
let value = self
335337
.sbp_client
336338
.lock()
337-
.read_setting(setting.0, setting.1)?
339+
.read_setting_with_timeout(setting.0, setting.1, SETTINGS_READ_WRITE_TIMEOUT)?
338340
.ok_or_else(|| anyhow!("setting not found"))?
339341
.value;
340342
if value.as_ref() != Some(&setting.2) {
@@ -397,15 +399,20 @@ impl SettingsTab {
397399
}
398400
}
399401
}
400-
let setting = self.sbp_client.lock().write_setting(group, name, value)?;
402+
let setting = self.sbp_client.lock().write_setting_with_timeout(
403+
group,
404+
name,
405+
value,
406+
SETTINGS_READ_WRITE_TIMEOUT,
407+
)?;
401408
if matches!(
402409
setting.setting.kind,
403410
SettingKind::Float | SettingKind::Double
404411
) {
405412
let setting = self
406413
.sbp_client
407414
.lock()
408-
.read_setting(group, name)?
415+
.read_setting_with_timeout(group, name, SETTINGS_READ_WRITE_TIMEOUT)?
409416
.ok_or_else(|| anyhow!("setting not found"))?;
410417
self.settings.lock().insert(setting);
411418
} else {
@@ -419,9 +426,29 @@ impl SettingsTab {
419426
}
420427

421428
fn read_all_settings(&self) {
422-
const TIMEOUT: Duration = Duration::from_secs(15);
429+
const GLOBAL_TIMEOUT: Duration = Duration::from_secs(15);
430+
431+
let (ctx, handle) = Context::with_timeout(SETTINGS_READ_WRITE_TIMEOUT);
432+
433+
let mut conn = self.shared_state.watch_connection();
434+
// this thread runs for at most `GLOBAL_TIMEOUT` seconds so we don't join it to avoid
435+
// blocking the completion of this function
436+
let _monitor_handle = std::thread::spawn(move || {
437+
let t = Instant::now();
438+
loop {
439+
let waited = t.elapsed();
440+
if waited >= GLOBAL_TIMEOUT {
441+
break;
442+
}
443+
if let Ok(Ok(conn)) = conn.wait_for(GLOBAL_TIMEOUT - waited) {
444+
if !conn.is_connected() {
445+
break;
446+
}
447+
}
448+
}
449+
handle.cancel();
450+
});
423451

424-
let (ctx, _handle) = Context::with_timeout(TIMEOUT);
425452
let (settings, errors) = self.sbp_client.lock().read_all_ctx(ctx);
426453
for e in errors {
427454
warn!("{}", e);

console_backend/src/watch.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
atomic::{AtomicBool, AtomicUsize, Ordering},
55
Arc,
66
},
7+
time::{Duration, Instant},
78
};
89

910
use parking_lot::{Condvar, MappedMutexGuard, Mutex, MutexGuard};
@@ -117,6 +118,14 @@ impl<T: Clone> WatchReceiver<T> {
117118
Ok(data.value.clone())
118119
}
119120

121+
pub fn wait_for(&mut self, duration: Duration) -> Result<Result<T, RecvError>, TimeoutError> {
122+
let data = self.wait_for_inner(duration)?;
123+
match data {
124+
Ok(data) => Ok(Ok(data.value.clone())),
125+
Err(e) => Ok(Err(e)),
126+
}
127+
}
128+
120129
pub fn wait_mut(&mut self) -> Result<MappedMutexGuard<T>, RecvError> {
121130
let data = self.wait_inner()?;
122131
Ok(MutexGuard::map(data, |d| &mut d.value))
@@ -155,6 +164,36 @@ impl<T: Clone> WatchReceiver<T> {
155164
self.last_seen = data.version;
156165
Ok(data)
157166
}
167+
168+
fn wait_for_inner(
169+
&mut self,
170+
duration: Duration,
171+
) -> Result<Result<MutexGuard<Value<T>>, RecvError>, TimeoutError> {
172+
if self.shared.is_closed() {
173+
return Ok(Err(RecvError));
174+
}
175+
let t = Instant::now();
176+
let mut data = self.shared.data.lock();
177+
while data.version == self.last_seen {
178+
let elapsed = t.elapsed();
179+
if elapsed >= duration {
180+
return Err(TimeoutError);
181+
}
182+
if self
183+
.shared
184+
.on_update
185+
.wait_for(&mut data, duration - elapsed)
186+
.timed_out()
187+
{
188+
return Err(TimeoutError);
189+
}
190+
if self.shared.is_closed() {
191+
return Ok(Err(RecvError));
192+
}
193+
}
194+
self.last_seen = data.version;
195+
Ok(Ok(data))
196+
}
158197
}
159198

160199
impl<T> Clone for WatchReceiver<T> {
@@ -200,6 +239,17 @@ impl fmt::Display for RecvError {
200239

201240
impl std::error::Error for RecvError {}
202241

242+
#[derive(Debug)]
243+
pub struct TimeoutError;
244+
245+
impl fmt::Display for TimeoutError {
246+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
247+
write!(fmt, "time out waiting for watched value")
248+
}
249+
}
250+
251+
impl std::error::Error for TimeoutError {}
252+
203253
#[cfg(test)]
204254
mod tests {
205255
use std::{thread, time::Duration};
@@ -208,6 +258,8 @@ mod tests {
208258

209259
use super::*;
210260

261+
const SECOND: Duration = Duration::from_secs(1);
262+
211263
#[test]
212264
fn starts_unseen() {
213265
let watched = Watched::new(0);
@@ -255,15 +307,15 @@ mod tests {
255307
s.send(()).unwrap()
256308
});
257309

258-
thread::sleep(Duration::from_secs(1));
310+
thread::sleep(SECOND);
259311
assert!(r.try_recv().is_err());
260312

261313
watched.send(1);
262-
thread::sleep(Duration::from_secs(1));
314+
thread::sleep(SECOND);
263315
assert!(r.try_recv().is_err());
264316

265317
watched.send(2);
266-
thread::sleep(Duration::from_secs(1));
318+
thread::sleep(SECOND);
267319
assert!(r.try_recv().is_ok());
268320
}
269321

@@ -274,12 +326,30 @@ mod tests {
274326
// mark first as seen
275327
let _ = recv.get();
276328
thread::spawn(move || {
277-
thread::sleep(Duration::from_secs(1));
329+
thread::sleep(SECOND);
278330
drop(watched);
279331
});
280332
assert!(recv.wait().is_err());
281333
}
282334

335+
#[test]
336+
fn wait_for() {
337+
let watched = Watched::new(0);
338+
let mut recv = watched.watch();
339+
let h = thread::spawn(move || {
340+
// mark first as seen
341+
let _ = recv.get();
342+
let res = recv.wait_for(SECOND / 2);
343+
assert!(matches!(res, Err(TimeoutError)));
344+
// wait for rest of the second plus a little bit
345+
let res = recv.wait_for(SECOND / 2 + Duration::from_millis(100));
346+
assert!(matches!(res, Ok(Ok(1))));
347+
});
348+
thread::sleep(SECOND);
349+
watched.send(1);
350+
h.join().unwrap();
351+
}
352+
283353
#[test]
284354
fn multiple_consumers() {
285355
let watched = Watched::new(0);

0 commit comments

Comments
 (0)