diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 7bab0c11d..7fd4aff93 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -34,9 +34,8 @@ use std::io::Error as IOError; pub mod column; pub mod manifest; pub mod snapshot; - +use crate::storage::ObjectStoreFormat; pub use manifest::create_from_parquet_file; - pub trait Snapshot { fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec; } @@ -97,7 +96,6 @@ pub async fn update_snapshot( // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; - let (lower_bound, _) = get_file_bounds(&change); let pos = manifests.iter().position(|item| { item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound @@ -120,82 +118,89 @@ pub async fn update_snapshot( } } if ch { - let Some(mut manifest) = storage.get_manifest(&path).await? else { - return Err(ObjectStorageError::UnhandledError( - "Manifest found in snapshot but not in object-storage" - .to_string() - .into(), - )); - }; - manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; - } else { - let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); - let upper_bound = lower_bound - .date_naive() - .and_time( - NaiveTime::from_num_seconds_from_midnight_opt( - 23 * 3600 + 59 * 60 + 59, - 999_999_999, - ) - .ok_or(IOError::new( - ErrorKind::Other, - "Failed to create upper bound for manifest", - )) - .map_err(ObjectStorageError::IoError)?, + if let Some(mut manifest) = storage.get_manifest(&path).await? { + manifest.apply_change(change); + storage.put_manifest(&path, manifest).await?; + } else { + //instead of returning an error, create a new manifest (otherwise local to storage sync fails) + //but don't update the snapshot + create_manifest( + lower_bound, + change, + storage.clone(), + stream_name, + false, + meta, ) - .and_utc(); - - let manifest = Manifest { - files: vec![change], - ..Manifest::default() - }; - - let mainfest_file_name = manifest_path("").to_string(); - let path = - partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); - storage - .put_object(&path, serde_json::to_vec(&manifest)?.into()) .await?; - let path = storage.absolute_url(&path); - let new_snapshot_entriy = snapshot::ManifestItem { - manifest_path: path.to_string(), - time_lower_bound: lower_bound, - time_upper_bound: upper_bound, - }; - manifests.push(new_snapshot_entriy); - storage.put_snapshot(stream_name, meta.snapshot).await?; + } + } else { + create_manifest( + lower_bound, + change, + storage.clone(), + stream_name, + true, + meta, + ) + .await?; } } else { - let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); - let upper_bound = lower_bound - .date_naive() - .and_time( - NaiveTime::from_num_seconds_from_midnight_opt( - 23 * 3600 + 59 * 60 + 59, - 999_999_999, - ) - .unwrap(), - ) - .and_utc(); + create_manifest( + lower_bound, + change, + storage.clone(), + stream_name, + true, + meta, + ) + .await?; + } - let manifest = Manifest { - files: vec![change], - ..Manifest::default() - }; + Ok(()) +} - let mainfest_file_name = manifest_path("").to_string(); - let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); - storage - .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) - .await?; +async fn create_manifest( + lower_bound: DateTime, + change: manifest::File, + storage: Arc, + stream_name: &str, + update_snapshot: bool, + mut meta: ObjectStoreFormat, +) -> Result<(), ObjectStorageError> { + let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let upper_bound = lower_bound + .date_naive() + .and_time( + NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999) + .ok_or(IOError::new( + ErrorKind::Other, + "Failed to create upper bound for manifest", + )) + .map_err(ObjectStorageError::IoError)?, + ) + .and_utc(); + + let manifest = Manifest { + files: vec![change], + ..Manifest::default() + }; + + let mainfest_file_name = manifest_path("").to_string(); + let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); + storage + .put_object(&path, serde_json::to_vec(&manifest)?.into()) + .await?; + if update_snapshot { + let mut manifests = meta.snapshot.manifest_list; let path = storage.absolute_url(&path); - let new_snapshot_entriy = snapshot::ManifestItem { + let new_snapshot_entry = snapshot::ManifestItem { manifest_path: path.to_string(), time_lower_bound: lower_bound, time_upper_bound: upper_bound, }; - manifests.push(new_snapshot_entriy); + manifests.push(new_snapshot_entry); + meta.snapshot.manifest_list = manifests; storage.put_snapshot(stream_name, meta.snapshot).await?; } @@ -207,21 +212,17 @@ pub async fn remove_manifest_from_snapshot( stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { + if !dates.is_empty() { + // get current snapshot + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; + // Filter out items whose manifest_path contains any of the dates_to_delete + manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + storage.put_snapshot(stream_name, meta.snapshot).await?; + } match CONFIG.parseable.mode { Mode::All | Mode::Ingest => { - if !dates.is_empty() { - // get current snapshot - let mut meta = storage.get_object_store_format(stream_name).await?; - let manifests = &mut meta.snapshot.manifest_list; - // Filter out items whose manifest_path contains any of the dates_to_delete - manifests - .retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); - storage.put_snapshot(stream_name, meta.snapshot).await?; - } - - let first_event_at = get_first_event(storage.clone(), stream_name, Vec::new()).await?; - - Ok(first_event_at) + Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?) } Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?), } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index f7974220c..433ec3651 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -338,6 +338,16 @@ pub async fn put_retention( pub async fn get_cache_enabled(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + match CONFIG.parseable.mode { + Mode::Ingest | Mode::All => { + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + } + _ => {} + } + let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?; Ok((web::Json(cache_enabled), StatusCode::OK)) } diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 17f796055..649f55ea2 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -328,10 +328,9 @@ impl TableProvider for StandardTableProvider { let obs = glob_storage .get_objects( Some(&path), - Box::new(|file_name| file_name.starts_with(".ingestor")), + Box::new(|file_name| file_name.ends_with("manifest.json")), ) .await; - if let Ok(obs) = obs { for ob in obs { if let Ok(object_store_format) = diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 512a9c4c0..fa689f0b8 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -247,10 +247,16 @@ pub fn convert_disk_files_to_parquet( writer.close()?; for file in files { - if fs::remove_file(file).is_err() { + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + + if fs::remove_file(file.clone()).is_err() { log::error!("Failed to delete file. Unstable state"); process::abort() } + metrics::STORAGE_SIZE + .with_label_values(&["staging", stream, file_type]) + .sub(file_size as i64); } }