Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 26 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion helm/templates/ingestor-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,4 @@ spec:
requests:
storage: {{ .Values.parseable.persistence.ingestor.size | quote }}
{{- end }}
{{- end }}
{{- end }}
19 changes: 4 additions & 15 deletions server/src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{oneshot, Mutex};
use tokio::time::{sleep, Duration};

// Create a global variable to store signal status
lazy_static! {
static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
pub static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
}

pub async fn liveness() -> HttpResponse {
Expand Down Expand Up @@ -66,23 +65,13 @@ pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;

// Trigger graceful shutdown
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
let _ = shutdown_sender.send(());
}

// Delay to allow readiness probe to return SERVICE_UNAVAILABLE
let _ = sleep(Duration::from_secs(20)).await;

// Sync to local
crate::event::STREAM_WRITERS.unset_all();

// Sync to S3
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
log::warn!("Failed to sync local data with object store. {:?}", e);
// Trigger graceful shutdown
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
let _ = shutdown_sender.send(());
}

log::info!("Local and S3 Sync done, handler SIGTERM completed.");
}
None => {
log::info!("Signal handler received None, indicating an error or end of stream");
Expand Down
35 changes: 29 additions & 6 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ impl ParseableServer for IngestServer {
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
let signal_task = tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
log::info!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
Expand All @@ -134,17 +134,40 @@ impl ParseableServer for IngestServer {
// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
let sync_task = tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();
let _ = shutdown_rx.await;

// Perform S3 sync and wait for completion
log::info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
log::warn!("Failed to sync local data with object store. {:?}", e);
} else {
log::info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;
// Await the HTTP server to run
let server_result = srv.await;

// Await the signal handler to ensure proper cleanup
if let Err(e) = signal_task.await {
log::error!("Error in signal handler: {:?}", e);
}

// Wait for the sync task to complete before exiting
if let Err(e) = sync_task.await {
log::error!("Error in sync task: {:?}", e);
} else {
log::info!("Sync task completed successfully.");
}

// Return the result of the server
server_result?;

Ok(())
}
Expand Down
34 changes: 29 additions & 5 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ impl ParseableServer for QueryServer {
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
let signal_task = tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
log::info!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
Expand All @@ -110,17 +111,40 @@ impl ParseableServer for QueryServer {
// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
let sync_task = tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();
let _ = shutdown_rx.await;

// Perform S3 sync and wait for completion
log::info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
log::warn!("Failed to sync local data with object store. {:?}", e);
} else {
log::info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;
// Await the HTTP server to run
let server_result = srv.await;

// Await the signal handler to ensure proper cleanup
if let Err(e) = signal_task.await {
log::error!("Error in signal handler: {:?}", e);
}

// Wait for the sync task to complete before exiting
if let Err(e) = sync_task.await {
log::error!("Error in sync task: {:?}", e);
} else {
log::info!("Sync task completed successfully.");
}

// Return the result of the server
server_result?;

Ok(())
}
Expand Down
36 changes: 29 additions & 7 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ use super::generate;
use super::ssl_acceptor::get_ssl_acceptor;
use super::OpenIdClient;
use super::ParseableServer;

#[derive(Default)]
pub struct Server;

Expand Down Expand Up @@ -110,9 +109,9 @@ impl ParseableServer for Server {
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
let signal_task = tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
log::info!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
Expand All @@ -132,17 +131,40 @@ impl ParseableServer for Server {
// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
let sync_task = tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();
let _ = shutdown_rx.await;

// Perform S3 sync and wait for completion
log::info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
log::warn!("Failed to sync local data with object store. {:?}", e);
} else {
log::info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;
// Await the HTTP server to run
let server_result = srv.await;

// Await the signal handler to ensure proper cleanup
if let Err(e) = signal_task.await {
log::error!("Error in signal handler: {:?}", e);
}

// Wait for the sync task to complete before exiting
if let Err(e) = sync_task.await {
log::error!("Error in sync task: {:?}", e);
} else {
log::info!("Sync task completed successfully.");
}

// Return the result of the server
server_result?;

Ok(())
}
Expand Down
Loading
Loading