Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl STREAM_INFO {
self.read().unwrap().keys().map(String::clone).collect()
}

#[allow(dead_code)]
pub fn update_stats(
&self,
stream_name: &str,
Expand Down
125 changes: 68 additions & 57 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,38 +100,32 @@ pub trait ObjectStorage: Sync + 'static {
continue;
}

match sync.move_local_to_temp() {
Ok(parquet_size) => {
if let Err(e) = STREAM_INFO.update_stats(&stream, 0, parquet_size) {
log::error!("Couldn't update stream stats. {:?}", e);
}
}
Err(e) => {
log::error!(
"Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]",
sync.dir.data_path.to_string_lossy(),
sync.dir.temp_dir.to_string_lossy(),
e
);
continue;
}
if let Err(e) = sync.move_local_to_temp() {
log::error!(
"Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]",
sync.dir.data_path.to_string_lossy(),
sync.dir.temp_dir.to_string_lossy(),
e
);
continue;
}
}

Ok(())
}

async fn s3_sync(&self) -> Result<(), ObjectStorageError> {
async fn s3_sync(&self) -> Result<(), MoveDataError> {
if !Path::new(&CONFIG.parseable.local_disk_path).exists() {
return Ok(());
}

let streams = STREAM_INFO.list_streams();

for stream in streams {
// get dir
let dir = StorageDir::new(stream.clone());

for file in WalkDir::new(dir.temp_dir)
// walk dir, find all .tmp files and convert to parquet
for file in WalkDir::new(&dir.temp_dir)
.min_depth(1)
.max_depth(1)
.into_iter()
Expand All @@ -144,7 +138,55 @@ pub trait ObjectStorage: Sync + 'static {
None => false,
};

!is_tmp
is_tmp
})
{
let record_tmp_file = file.file_name().unwrap().to_str().unwrap();
let file = File::open(&file).map_err(|_| MoveDataError::Open)?;
let reader = StreamReader::try_new(file, None)?;
let schema = reader.schema();
let records = reader.filter_map(|record| match record {
Ok(record) => Some(record),
Err(e) => {
log::warn!("error when reading from arrow stream {:?}", e);
None
}
});

let parquet_path = dir.temp_dir.join(
record_tmp_file
.strip_suffix(".tmp")
.expect("file has a .tmp extention"),
);
let parquet_file =
fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?;

for ref record in records {
writer.write(record)?;
}

writer.close()?;

fs::remove_file(dir.temp_dir.join(record_tmp_file))
.map_err(|_| MoveDataError::Delete)?;
}

for file in WalkDir::new(dir.temp_dir)
.min_depth(1)
.max_depth(1)
.into_iter()
.filter_map(|file| file.ok())
.map(|file| file.path().to_path_buf())
.filter(|file| file.is_file())
.filter(|file| {
let is_parquet = match file.extension() {
Some(ext) => ext.eq_ignore_ascii_case("parquet"),
None => false,
};

is_parquet
})
{
let filename = file.file_name().unwrap().to_str().unwrap();
Expand Down Expand Up @@ -195,40 +237,11 @@ impl StorageDir {
fs::create_dir_all(&self.temp_dir)
}

pub fn move_local_to_temp(&self, filename: String) -> Result<u64, MoveDataError> {
let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp");
fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)
.map_err(|_| MoveDataError::Rename)?;
pub fn move_local_to_temp(&self, filename: String) -> io::Result<()> {
let record_tmp_file_path = self.temp_dir.join(filename + ".tmp");
fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)?;
event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap();
let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?;
let reader = StreamReader::try_new(file, None)?;
let schema = reader.schema();
let records = reader.filter_map(|record| match record {
Ok(record) => Some(record),
Err(e) => {
log::warn!("error when reading from arrow stream {:?}", e);
None
}
});

let parquet_path = self.temp_dir.join(filename);
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?;

for ref record in records {
writer.write(record)?;
}

writer.close()?;

fs::remove_file(record_tmp_file_path).map_err(|_| MoveDataError::Delete)?;

let compressed_size = fs::metadata(parquet_path)
.map_err(|_| MoveDataError::Metadata)?
.len();

Ok(compressed_size)
Ok(())
}

pub fn local_data_exists(&self) -> bool {
Expand All @@ -238,20 +251,18 @@ impl StorageDir {

#[derive(Debug, thiserror::Error)]
pub enum MoveDataError {
#[error("Failed to rename file")]
Rename,
#[error("Unable to Open file after moving")]
Open,
#[error("Unable to create recordbatch stream")]
Arrow(#[from] ArrowError),
#[error("Could not generate parquet file")]
Parquet(#[from] ParquetError),
#[error("Object Storage Error {0}")]
ObjectStorag(#[from] ObjectStorageError),
#[error("Could not generate parquet file")]
Create,
#[error("Could not delete temp arrow file")]
Delete,
#[error("Could not fetch metadata of moved parquet file")]
Metadata,
}

struct StorageSync {
Expand All @@ -266,7 +277,7 @@ impl StorageSync {
Self { dir, time }
}

fn move_local_to_temp(&self) -> Result<u64, MoveDataError> {
fn move_local_to_temp(&self) -> io::Result<()> {
let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64);
let uri = utils::date_to_prefix(time.date())
+ &utils::hour_to_prefix(time.hour())
Expand Down