diff --git a/server/Cargo.toml b/server/Cargo.toml index 047d6aab3..d22af6427 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -44,6 +44,7 @@ structopt = { version = "0.3.25" } sysinfo = "0.20.5" thiserror = "1" tokio-stream = "0.1.8" +tokio = { version = "1.13.1", default-features = false, features=["sync", "macros"] } clokwerk = "0.4.0-rc1" actix-web-static-files = "4.0" static-files = "0.2.1" diff --git a/server/src/main.rs b/server/src/main.rs index 9be755e71..e1a8d347e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,13 +22,17 @@ use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; use actix_web_static_files::ResourceFiles; -use clokwerk::{AsyncScheduler, TimeUnits}; +use clokwerk::{AsyncScheduler, Scheduler, TimeUnits}; use log::warn; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; include!(concat!(env!("OUT_DIR"), "/generated.rs")); -use std::thread; +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::TryRecvError; mod banner; mod error; @@ -63,33 +67,110 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = metadata::STREAM_INFO.load(&storage).await { warn!("could not populate local metadata. {:?}", e); } - thread::spawn(sync); - run_http().await?; - Ok(()) -} + let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); + let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync(); -#[actix_web::main] -async fn sync() { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(|| async { - if let Err(e) = S3::new().local_sync().await { - warn!("failed to sync local data. {:?}", e); + let app = run_http(); + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + s3sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + localsync_handler.join().unwrap_or(()); + s3sync_handler.join().unwrap_or(()); + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to disc. This can happen due to critical error in disc or environment. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut s3sync_outbox => { + // s3sync failed, this is recoverable by just starting s3sync thread again + s3sync_handler.join().unwrap_or(()); + (s3sync_handler, s3sync_outbox, s3sync_inbox) = s3_sync(); } + }; + } +} + +fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let handle = thread::spawn(move || { + let res = catch_unwind(move || { + let rt = actix_web::rt::System::new(); + rt.block_on(async { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every((CONFIG.parseable.upload_interval as u32).seconds()) + .run(|| async { + if let Err(e) = S3::new().s3_sync().await { + warn!("failed to sync local data with object store. {:?}", e); + } + }); + + loop { + scheduler.run_pending().await; + match AssertUnwindSafe(|| inbox_rx.try_recv())() { + Ok(_) => break, + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + // should be unreachable but breaking anyways + break; + } + } + } + }) }); - scheduler - .every((CONFIG.parseable.upload_interval as u32).seconds()) - .run(|| async { - if let Err(e) = S3::new().s3_sync().await { - warn!("failed to sync local data with object store. {:?}", e); + + if res.is_err() { + outbox_tx.send(()).unwrap(); + } + }); + + (handle, outbox_rx, inbox_tx) +} + +fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let handle = thread::spawn(move || { + let res = catch_unwind(move || { + let mut scheduler = Scheduler::new(); + scheduler + .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) + .run(move || { + if let Err(e) = S3::new().local_sync() { + warn!("failed to sync local data. {:?}", e); + } + }); + + loop { + thread::sleep(Duration::from_millis(50)); + scheduler.run_pending(); + match AssertUnwindSafe(|| inbox_rx.try_recv())() { + Ok(_) => break, + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + // should be unreachable but breaking anyways + break; + } + } } }); - loop { - scheduler.run_pending().await; - } + if res.is_err() { + outbox_tx.send(()).unwrap(); + } + }); + + (handle, outbox_rx, inbox_tx) } async fn validator( diff --git a/server/src/storage.rs b/server/src/storage.rs index c31e01744..7dc8c0183 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -63,7 +63,7 @@ pub trait ObjectStorage: Sync + 'static { query: &Query, results: &mut Vec, ) -> Result<(), ObjectStorageError>; - async fn local_sync(&self) -> io::Result<()> { + fn local_sync(&self) -> io::Result<()> { // If the local data path doesn't exist yet, return early. // This method will be called again after next ticker interval if !Path::new(&CONFIG.parseable.local_disk_path).exists() {