From 370c110523ae7b9e06d8bba4726e66cc6df5b747 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 16 Sep 2022 10:51:31 +0530 Subject: [PATCH 1/2] Delgate parquet creation to s3sync instead --- server/src/metadata.rs | 1 + server/src/storage.rs | 123 ++++++++++++++++++++++------------------- 2 files changed, 68 insertions(+), 56 deletions(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index e173dd9e7..c7f923529 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -172,6 +172,7 @@ impl STREAM_INFO { self.read().unwrap().keys().map(String::clone).collect() } + #[allow(dead_code)] pub fn update_stats( &self, stream_name: &str, diff --git a/server/src/storage.rs b/server/src/storage.rs index 764b86823..b583b9d86 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -100,28 +100,21 @@ pub trait ObjectStorage: Sync + 'static { continue; } - match sync.move_local_to_temp() { - Ok(parquet_size) => { - if let Err(e) = STREAM_INFO.update_stats(&stream, 0, parquet_size) { - log::error!("Couldn't update stream stats. {:?}", e); - } - } - Err(e) => { - log::error!( - "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", - sync.dir.data_path.to_string_lossy(), - sync.dir.temp_dir.to_string_lossy(), - e - ); - continue; - } + if let Err(e) = sync.move_local_to_temp() { + log::error!( + "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", + sync.dir.data_path.to_string_lossy(), + sync.dir.temp_dir.to_string_lossy(), + e + ); + continue; } } Ok(()) } - async fn s3_sync(&self) -> Result<(), ObjectStorageError> { + async fn s3_sync(&self) -> Result<(), MoveDataError> { if !Path::new(&CONFIG.parseable.local_disk_path).exists() { return Ok(()); } @@ -129,9 +122,10 @@ pub trait ObjectStorage: Sync + 'static { let streams = STREAM_INFO.list_streams(); for stream in streams { + // get dir let dir = StorageDir::new(stream.clone()); - - for file in WalkDir::new(dir.temp_dir) + // walk dir, find all .tmp files and convert to parquet + for file in WalkDir::new(&dir.temp_dir) .min_depth(1) .max_depth(1) .into_iter() @@ -144,7 +138,55 @@ pub trait ObjectStorage: Sync + 'static { None => false, }; - !is_tmp + is_tmp + }) + { + let record_tmp_file = file.file_name().unwrap().to_str().unwrap(); + let file = File::open(&file).map_err(|_| MoveDataError::Open)?; + let reader = StreamReader::try_new(file, None)?; + let schema = reader.schema(); + let records = reader.filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("error when reading from arrow stream {:?}", e); + None + } + }); + + let parquet_path = dir.temp_dir.join( + record_tmp_file + .strip_suffix(".tmp") + .expect("file has a .tmp extention"), + ); + let parquet_file = + fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; + + for ref record in records { + writer.write(record)?; + } + + writer.close()?; + + fs::remove_file(dir.temp_dir.join(record_tmp_file)) + .map_err(|_| MoveDataError::Delete)?; + } + + for file in WalkDir::new(dir.temp_dir) + .min_depth(1) + .max_depth(1) + .into_iter() + .filter_map(|file| file.ok()) + .map(|file| file.path().to_path_buf()) + .filter(|file| file.is_file()) + .filter(|file| { + let is_parquet = match file.extension() { + Some(ext) => ext.eq_ignore_ascii_case("parquet"), + None => false, + }; + + is_parquet }) { let filename = file.file_name().unwrap().to_str().unwrap(); @@ -195,40 +237,11 @@ impl StorageDir { fs::create_dir_all(&self.temp_dir) } - pub fn move_local_to_temp(&self, filename: String) -> Result { + pub fn move_local_to_temp(&self, filename: String) -> io::Result<()> { let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp"); - fs::rename(self.data_path.join("data.records"), &record_tmp_file_path) - .map_err(|_| MoveDataError::Rename)?; + fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)?; event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap(); - let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?; - let reader = StreamReader::try_new(file, None)?; - let schema = reader.schema(); - let records = reader.filter_map(|record| match record { - Ok(record) => Some(record), - Err(e) => { - log::warn!("error when reading from arrow stream {:?}", e); - None - } - }); - - let parquet_path = self.temp_dir.join(filename); - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; - - for ref record in records { - writer.write(record)?; - } - - writer.close()?; - - fs::remove_file(record_tmp_file_path).map_err(|_| MoveDataError::Delete)?; - - let compressed_size = fs::metadata(parquet_path) - .map_err(|_| MoveDataError::Metadata)? - .len(); - - Ok(compressed_size) + Ok(()) } pub fn local_data_exists(&self) -> bool { @@ -238,20 +251,18 @@ impl StorageDir { #[derive(Debug, thiserror::Error)] pub enum MoveDataError { - #[error("Failed to rename file")] - Rename, #[error("Unable to Open file after moving")] Open, #[error("Unable to create recordbatch stream")] Arrow(#[from] ArrowError), #[error("Could not generate parquet file")] Parquet(#[from] ParquetError), + #[error("Object Storage Error {0}")] + ObjectStorag(#[from] ObjectStorageError), #[error("Could not generate parquet file")] Create, #[error("Could not delete temp arrow file")] Delete, - #[error("Could not fetch metadata of moved parquet file")] - Metadata, } struct StorageSync { @@ -266,7 +277,7 @@ impl StorageSync { Self { dir, time } } - fn move_local_to_temp(&self) -> Result { + fn move_local_to_temp(&self) -> io::Result<()> { let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64); let uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) From 4c38b401bafa0290d002672a22a831d8b3c13df1 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 20 Sep 2022 14:17:09 +0530 Subject: [PATCH 2/2] Remove clone --- server/src/storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage.rs b/server/src/storage.rs index b583b9d86..d86663c65 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -238,7 +238,7 @@ impl StorageDir { } pub fn move_local_to_temp(&self, filename: String) -> io::Result<()> { - let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp"); + let record_tmp_file_path = self.temp_dir.join(filename + ".tmp"); fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)?; event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap(); Ok(())