From 2fb70663edfcc0badb86f9ac3df4a7eb22dcd987 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 10 Sep 2022 16:06:07 +0530 Subject: [PATCH] Set max thread priority for local-sync. localsync thread is important and should run at exact intervals. To ensure this its priority is set to max. Priority setting if fails is ignored for now which means if thread_priority can't set it to max then all can be done is log it. Scheduling behaviour is dependant on the underlying OS. It is user's responsiblity to make sure that program can set thread/process priority. For example on linux you'd have to allow `cpu cpuset` cgroup for current user. --- server/Cargo.toml | 1 + server/src/main.rs | 61 ++++++++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index c48b8d963..e061ded1c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -40,6 +40,7 @@ serde_json = "^1.0.8" structopt = { version = "0.3.25" } sysinfo = "0.20.5" thiserror = "1" +thread-priority = "0.9.2" tokio-stream = "0.1.8" tokio = { version = "1.13.1", default-features = false, features=["sync", "macros"] } clokwerk = "0.4.0-rc1" diff --git a/server/src/main.rs b/server/src/main.rs index 919e2abc1..d5f3320ef 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -27,6 +27,7 @@ use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; use filetime::FileTime; use log::warn; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; +use thread_priority::{ThreadBuilder, ThreadPriority}; include!(concat!(env!("OUT_DIR"), "/generated.rs")); @@ -211,35 +212,43 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<( let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let handle = thread::spawn(move || { - let res = catch_unwind(move || { - let mut scheduler = Scheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(move || { - if let Err(e) = S3::new().local_sync() { - warn!("failed to sync local data. {:?}", e); - } - }); - - loop { - thread::sleep(Duration::from_millis(50)); - scheduler.run_pending(); - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; + + let handle = ThreadBuilder::default() + .name("local-sync") + .priority(ThreadPriority::Max) + .spawn(move |priority_result| { + if priority_result.is_err() { + log::warn!("Max priority cannot be set for sync thread. Make sure that user/program is allowed to set thread priority.") + } + let res = catch_unwind(move || { + let mut scheduler = Scheduler::new(); + scheduler + .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) + .run(move || { + if let Err(e) = S3::new().local_sync() { + warn!("failed to sync local data. {:?}", e); + } + }); + + loop { + thread::sleep(Duration::from_millis(50)); + scheduler.run_pending(); + match AssertUnwindSafe(|| inbox_rx.try_recv())() { + Ok(_) => break, + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + // should be unreachable but breaking anyways + break; + } } } - } - }); + }); - if res.is_err() { - outbox_tx.send(()).unwrap(); - } - }); + if res.is_err() { + outbox_tx.send(()).unwrap(); + } + }) + .unwrap(); (handle, outbox_rx, inbox_tx) }