Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions console_backend/src/settings_tab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ lazy_static! {

pub fn start_thd<S: CapnProtoSender>(tab: &SettingsTab<S>) {
let mut recv = tab.shared_state.watch_settings_state();
while let Ok(state) = recv.wait() {
tab.shared_state.set_settings_state(Default::default());
tick(tab, state);
while let Ok(mut guard) = recv.wait_mut() {
let state = &mut *guard;
// taking the state reverts the settings state back to its default value
// without triggering another update (which would cause this to loop forever)
let s = std::mem::take(state);
// drop the guard so the settings state can get updated while`tick` runs (which might take a while)
drop(guard);
tick(tab, s);
}
}

Expand Down
71 changes: 39 additions & 32 deletions console_backend/src/watch.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::fmt;
use std::sync::atomic::AtomicUsize;
use std::sync::Weak;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
use std::{
fmt,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};

use parking_lot::{Condvar, Mutex};
use parking_lot::{Condvar, MappedMutexGuard, Mutex, MutexGuard};

pub struct Watched<T> {
shared: Arc<Shared<T>>,
Expand Down Expand Up @@ -43,7 +43,7 @@ impl<T: Clone> Watched<T> {
data.version
};
WatchReceiver {
shared: Arc::downgrade(&self.shared),
shared: Arc::clone(&self.shared),
last_seen: version.wrapping_sub(1),
}
}
Expand Down Expand Up @@ -85,21 +85,25 @@ impl<T> Drop for Watched<T> {
}

pub struct WatchReceiver<T> {
shared: Weak<Shared<T>>,
shared: Arc<Shared<T>>,
last_seen: u64,
}

impl<T: Clone> WatchReceiver<T> {
pub fn get(&mut self) -> Result<T, RecvError> {
let shared = Shared::upgrade(&self.shared)?;
let data = shared.data.lock();
if self.shared.is_closed() {
return Err(RecvError);
}
let data = self.shared.data.lock();
self.last_seen = data.version;
Ok(data.value.clone())
}

pub fn get_if_new(&mut self) -> Result<Option<T>, RecvError> {
let shared = Shared::upgrade(&self.shared)?;
let data = shared.data.lock();
if self.shared.is_closed() {
return Err(RecvError);
}
let data = self.shared.data.lock();
if self.last_seen == data.version {
Ok(None)
} else {
Expand All @@ -109,18 +113,15 @@ impl<T: Clone> WatchReceiver<T> {
}

pub fn wait(&mut self) -> Result<T, RecvError> {
let shared = Shared::upgrade(&self.shared)?;
let mut data = shared.data.lock();
while data.version == self.last_seen {
shared.on_update.wait(&mut data);
if shared.is_closed() {
return Err(RecvError);
}
}
self.last_seen = data.version;
let data = self.wait_inner()?;
Ok(data.value.clone())
}

pub fn wait_mut(&mut self) -> Result<MappedMutexGuard<T>, RecvError> {
let data = self.wait_inner()?;
Ok(MutexGuard::map(data, |d| &mut d.value))
}

pub fn wait_until<F>(&mut self, mut f: F) -> Result<T, RecvError>
where
F: FnMut(&T) -> bool,
Expand All @@ -139,12 +140,27 @@ impl<T: Clone> WatchReceiver<T> {
{
self.wait_until(|v| !f(v))
}

fn wait_inner(&mut self) -> Result<MutexGuard<Value<T>>, RecvError> {
if self.shared.is_closed() {
return Err(RecvError);
}
let mut data = self.shared.data.lock();
while data.version == self.last_seen {
self.shared.on_update.wait(&mut data);
if self.shared.is_closed() {
return Err(RecvError);
}
}
self.last_seen = data.version;
Ok(data)
}
}

impl<T> Clone for WatchReceiver<T> {
fn clone(&self) -> WatchReceiver<T> {
WatchReceiver {
shared: Weak::clone(&self.shared),
shared: Arc::clone(&self.shared),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't be a weak ref if we want to return borrowed data. I don't think it was actually needed anyway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the watched value doesn't somehow hold a reference to the watcher then we should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that should be impossible with the current setup. My original idea was that if you drop the watched value the receivers are useless so might as well free the memory. In practice I think the watched values and the receivers are going to be dropped at the same time anyway (when process_messages ends)

last_seen: self.last_seen,
}
}
Expand All @@ -166,15 +182,6 @@ impl<T> Shared<T> {
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}

fn upgrade(me: &Weak<Self>) -> Result<Arc<Self>, RecvError> {
let shared = me.upgrade().ok_or(RecvError)?;
if shared.is_closed() {
Err(RecvError)
} else {
Ok(shared)
}
}
}

struct Value<T> {
Expand Down