Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 82 additions & 81 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ use std::io::Error as IOError;
pub mod column;
pub mod manifest;
pub mod snapshot;

use crate::storage::ObjectStoreFormat;
pub use manifest::create_from_parquet_file;

pub trait Snapshot {
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem>;
}
Expand Down Expand Up @@ -97,7 +96,6 @@ pub async fn update_snapshot(
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;

let (lower_bound, _) = get_file_bounds(&change);
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
Expand All @@ -120,82 +118,89 @@ pub async fn update_snapshot(
}
}
if ch {
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))
.map_err(ObjectStorageError::IoError)?,
if let Some(mut manifest) = storage.get_manifest(&path).await? {
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
} else {
//instead of returning an error, create a new manifest (otherwise local to storage sync fails)
//but don't update the snapshot
create_manifest(
lower_bound,
change,
storage.clone(),
stream_name,
false,
meta,
)
.and_utc();

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};

let mainfest_file_name = manifest_path("").to_string();
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest)?.into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
} else {
create_manifest(
lower_bound,
change,
storage.clone(),
stream_name,
true,
meta,
)
.await?;
}
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
)
.and_utc();
create_manifest(
lower_bound,
change,
storage.clone(),
stream_name,
true,
meta,
)
.await?;
}

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};
Ok(())
}

let mainfest_file_name = manifest_path("").to_string();
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
async fn create_manifest(
lower_bound: DateTime<Utc>,
change: manifest::File,
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
update_snapshot: bool,
mut meta: ObjectStoreFormat,
) -> Result<(), ObjectStorageError> {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))
.map_err(ObjectStorageError::IoError)?,
)
.and_utc();

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};

let mainfest_file_name = manifest_path("").to_string();
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest)?.into())
.await?;
if update_snapshot {
let mut manifests = meta.snapshot.manifest_list;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
let new_snapshot_entry = snapshot::ManifestItem {
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
manifests.push(new_snapshot_entry);
meta.snapshot.manifest_list = manifests;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

Expand All @@ -207,21 +212,17 @@ pub async fn remove_manifest_from_snapshot(
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
match CONFIG.parseable.mode {
Mode::All | Mode::Ingest => {
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests
.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

let first_event_at = get_first_event(storage.clone(), stream_name, Vec::new()).await?;

Ok(first_event_at)
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
}
Expand Down
10 changes: 10 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,16 @@ pub async fn put_retention(

pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match CONFIG.parseable.mode {
Mode::Ingest | Mode::All => {
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
_ => {}
}

let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?;
Ok((web::Json(cache_enabled), StatusCode::OK))
}
Expand Down
3 changes: 1 addition & 2 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,9 @@ impl TableProvider for StandardTableProvider {
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.starts_with(".ingestor")),
Box::new(|file_name| file_name.ends_with("manifest.json")),
)
.await;

if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) =
Expand Down
8 changes: 7 additions & 1 deletion server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,16 @@ pub fn convert_disk_files_to_parquet(
writer.close()?;

for file in files {
if fs::remove_file(file).is_err() {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();

if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
}

Expand Down