diff --git a/server/src/metadata.rs b/server/src/metadata.rs index e173dd9e7..c7f923529 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -172,6 +172,7 @@ impl STREAM_INFO { self.read().unwrap().keys().map(String::clone).collect() } + #[allow(dead_code)] pub fn update_stats( &self, stream_name: &str, diff --git a/server/src/storage.rs b/server/src/storage.rs index 764b86823..d86663c65 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -100,28 +100,21 @@ pub trait ObjectStorage: Sync + 'static { continue; } - match sync.move_local_to_temp() { - Ok(parquet_size) => { - if let Err(e) = STREAM_INFO.update_stats(&stream, 0, parquet_size) { - log::error!("Couldn't update stream stats. {:?}", e); - } - } - Err(e) => { - log::error!( - "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", - sync.dir.data_path.to_string_lossy(), - sync.dir.temp_dir.to_string_lossy(), - e - ); - continue; - } + if let Err(e) = sync.move_local_to_temp() { + log::error!( + "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", + sync.dir.data_path.to_string_lossy(), + sync.dir.temp_dir.to_string_lossy(), + e + ); + continue; } } Ok(()) } - async fn s3_sync(&self) -> Result<(), ObjectStorageError> { + async fn s3_sync(&self) -> Result<(), MoveDataError> { if !Path::new(&CONFIG.parseable.local_disk_path).exists() { return Ok(()); } @@ -129,9 +122,10 @@ pub trait ObjectStorage: Sync + 'static { let streams = STREAM_INFO.list_streams(); for stream in streams { + // get dir let dir = StorageDir::new(stream.clone()); - - for file in WalkDir::new(dir.temp_dir) + // walk dir, find all .tmp files and convert to parquet + for file in WalkDir::new(&dir.temp_dir) .min_depth(1) .max_depth(1) .into_iter() @@ -144,7 +138,55 @@ pub trait ObjectStorage: Sync + 'static { None => false, }; - !is_tmp + is_tmp + }) + { + let record_tmp_file = file.file_name().unwrap().to_str().unwrap(); + let file = File::open(&file).map_err(|_| MoveDataError::Open)?; + let reader = StreamReader::try_new(file, None)?; + let schema = reader.schema(); + let records = reader.filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("error when reading from arrow stream {:?}", e); + None + } + }); + + let parquet_path = dir.temp_dir.join( + record_tmp_file + .strip_suffix(".tmp") + .expect("file has a .tmp extention"), + ); + let parquet_file = + fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; + + for ref record in records { + writer.write(record)?; + } + + writer.close()?; + + fs::remove_file(dir.temp_dir.join(record_tmp_file)) + .map_err(|_| MoveDataError::Delete)?; + } + + for file in WalkDir::new(dir.temp_dir) + .min_depth(1) + .max_depth(1) + .into_iter() + .filter_map(|file| file.ok()) + .map(|file| file.path().to_path_buf()) + .filter(|file| file.is_file()) + .filter(|file| { + let is_parquet = match file.extension() { + Some(ext) => ext.eq_ignore_ascii_case("parquet"), + None => false, + }; + + is_parquet }) { let filename = file.file_name().unwrap().to_str().unwrap(); @@ -195,40 +237,11 @@ impl StorageDir { fs::create_dir_all(&self.temp_dir) } - pub fn move_local_to_temp(&self, filename: String) -> Result { - let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp"); - fs::rename(self.data_path.join("data.records"), &record_tmp_file_path) - .map_err(|_| MoveDataError::Rename)?; + pub fn move_local_to_temp(&self, filename: String) -> io::Result<()> { + let record_tmp_file_path = self.temp_dir.join(filename + ".tmp"); + fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)?; event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap(); - let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?; - let reader = StreamReader::try_new(file, None)?; - let schema = reader.schema(); - let records = reader.filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("error when reading from arrow stream {:?}", e); - None - } - }); - - let parquet_path = self.temp_dir.join(filename); - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; - - for ref record in records { - writer.write(record)?; - } - - writer.close()?; - - fs::remove_file(record_tmp_file_path).map_err(|_| MoveDataError::Delete)?; - - let compressed_size = fs::metadata(parquet_path) - .map_err(|_| MoveDataError::Metadata)? - .len(); - - Ok(compressed_size) + Ok(()) } pub fn local_data_exists(&self) -> bool { @@ -238,20 +251,18 @@ impl StorageDir { #[derive(Debug, thiserror::Error)] pub enum MoveDataError { - #[error("Failed to rename file")] - Rename, #[error("Unable to Open file after moving")] Open, #[error("Unable to create recordbatch stream")] Arrow(#[from] ArrowError), #[error("Could not generate parquet file")] Parquet(#[from] ParquetError), + #[error("Object Storage Error {0}")] + ObjectStorag(#[from] ObjectStorageError), #[error("Could not generate parquet file")] Create, #[error("Could not delete temp arrow file")] Delete, - #[error("Could not fetch metadata of moved parquet file")] - Metadata, } struct StorageSync { @@ -266,7 +277,7 @@ impl StorageSync { Self { dir, time } } - fn move_local_to_temp(&self) -> Result { + fn move_local_to_temp(&self) -> io::Result<()> { let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64); let uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour())