Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions server/src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use actix_web::middleware::Next;
use actix_web::{Error, HttpResponse};
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::signal::ctrl_c;

use tokio::sync::{oneshot, Mutex};

// Create a global variable to store signal status
Expand All @@ -52,35 +53,45 @@ pub async fn check_shutdown_middleware(
}

pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler");
log::info!("Signal handler task started");

// Block until SIGTERM is received
match sigterm.recv().await {
Some(_) => {
log::info!("Received SIGTERM signal at Readiness Probe Handler");

// Set the shutdown flag to true
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;

// Sync to local
crate::event::STREAM_WRITERS.unset_all();

// Trigger graceful shutdown
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
let _ = shutdown_sender.send(());
#[cfg(windows)]
{
tokio::select! {
_ = ctrl_c() => {
log::info!("Received SIGINT signal at Readiness Probe Handler");
shutdown(shutdown_signal).await;
}
}
None => {
log::info!("Signal handler received None, indicating an error or end of stream");
}
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).unwrap();
tokio::select! {
_ = ctrl_c() => {
log::info!("Received SIGINT signal at Readiness Probe Handler");
shutdown(shutdown_signal).await;
},
_ = sigterm.recv() => {
log::info!("Received SIGTERM signal at Readiness Probe Handler");
shutdown(shutdown_signal).await;
}
}
}

log::info!("Signal handler task completed");
}

async fn shutdown(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
// Set the shutdown flag to true
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;

// Sync to local
crate::event::STREAM_WRITERS.unset_all();

// Trigger graceful shutdown
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
let _ = shutdown_sender.send(());
}
}
pub async fn readiness() -> HttpResponse {
// Check the object store connection
if CONFIG.storage().get_object_store().check().await.is_ok() {
Expand Down
24 changes: 18 additions & 6 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,24 @@ fn partitioned_files(
// object_store::path::Path doesn't automatically deal with Windows path separators
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
// before sending the file path to PartitionedFile
let pf = if CONFIG.storage_name.eq("drive") {
let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
PartitionedFile::new(file_path, file.file_size)
} else {
PartitionedFile::new(file_path, file.file_size)
};
// the github issue- https://github.com/parseablehq/parseable/issues/824
// For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution
// TODO: figure out an elegant solution to this
let pf;

#[cfg(unix)]
{
pf = PartitionedFile::new(file_path, file.file_size);
}
#[cfg(windows)]
{
pf = if CONFIG.storage_name.eq("drive") {
let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
PartitionedFile::new(file_path, file.file_size)
} else {
PartitionedFile::new(file_path, file.file_size)
};
}

partitioned_files[index].push(pf);
columns.into_iter().for_each(|col| {
Expand Down
Loading