Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::handlers::{
use crate::hottier::{HotTierManager, StreamHotTier};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::validation::bytes_to_human_size;
use crate::option::{Mode, CONFIG};
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
Expand Down Expand Up @@ -960,10 +961,10 @@ pub async fn put_stream_hot_tier(

STREAM_INFO.set_hot_tier(&stream_name, true)?;
if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some("0GiB".to_string());
hottier.used_size = Some(bytes_to_human_size(existing_hot_tier_used_size));
hottier.available_size = Some(hottier.size.clone());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
Expand Down
23 changes: 12 additions & 11 deletions server/src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,18 @@ impl HotTierManager {
&self,
stream: &str,
size: &str,
) -> Result<(), HotTierError> {
) -> Result<u64, HotTierError> {
let mut existing_hot_tier_used_size = 0;
let mut existing_hot_tier_size = 0;
if self.check_stream_hot_tier_exists(stream) {
//delete existing hot tier if its size is less than the updated hot tier size else return error
let existing_hot_tier = self.get_hot_tier(stream).await?;
existing_hot_tier_used_size =
human_size_to_bytes(&existing_hot_tier.used_size.unwrap()).unwrap();
existing_hot_tier_size = human_size_to_bytes(&existing_hot_tier.size).unwrap();
if human_size_to_bytes(size) < human_size_to_bytes(&existing_hot_tier.size) {
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
"The hot tier size for the stream is already set to {} which is greater than the updated hot tier size of {}, reducing the hot tier size is not allowed",
"Reducing hot tier size is not supported, failed to reduce the hot tier size from {} to {}",
existing_hot_tier.size,
size
))));
Expand All @@ -129,7 +131,7 @@ impl HotTierManager {

let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage();

if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) =
if let (Some(total_disk_space), _, Some(used_disk_space)) =
(total_disk_space, available_disk_space, used_disk_space)
{
let stream_hot_tier_size = human_size_to_bytes(size).unwrap();
Expand All @@ -140,20 +142,19 @@ impl HotTierManager {
- total_hot_tier_used_size;
let usage_percentage =
((projected_disk_usage as f64 / total_disk_space as f64) * 100.0).round();
let max_allowed_hot_tier_size =
((CONFIG.parseable.max_disk_usage * total_disk_space as f64) / 100.0)
- (used_disk_space + total_hot_tier_used_size + existing_hot_tier_used_size
- existing_hot_tier_size) as f64;

if usage_percentage > CONFIG.parseable.max_disk_usage {
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
"Including the hot tier size of all the streams, the projected disk usage will be {}% which is above the set threshold of {}%, hence unable to set the hot tier for the stream. Total Disk Size: {}, Available Disk Size: {}, Used Disk Size: {}, Total Hot Tier Size (all other streams): {}",
usage_percentage,
CONFIG.parseable.max_disk_usage,
bytes_to_human_size(total_disk_space),
bytes_to_human_size(available_disk_space),
bytes_to_human_size(used_disk_space),
bytes_to_human_size(total_hot_tier_size)
"{} is the total usable disk space for hot tier, cannot set a bigger value.", max_allowed_hot_tier_size
))));
}
}

Ok(())
Ok(existing_hot_tier_used_size)
}

///get the hot tier metadata file for the stream
Expand Down
9 changes: 5 additions & 4 deletions server/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ pub fn user_name(username: &str) -> Result<(), UsernameValidationError> {

pub fn hot_tier(size: &str) -> Result<(), HotTierValidationError> {
if human_size_to_bytes(size).is_err() {
return Err(HotTierValidationError::Size(bytes_to_human_size(
MIN_STREAM_HOT_TIER_SIZE_BYTES,
)));
return Err(HotTierValidationError::InvalidFormat);
}

if human_size_to_bytes(size).unwrap() < MIN_STREAM_HOT_TIER_SIZE_BYTES {
Expand Down Expand Up @@ -213,7 +211,10 @@ pub mod error {

#[derive(Debug, thiserror::Error)]
pub enum HotTierValidationError {
#[error("Invalid size given for hot tier, please provide size in human readable format, e.g 1GiB, 2GiB, minimum {0}")]
#[error("Please provide size in human readable format, e.g 10GiB, 20GiB")]
InvalidFormat,

#[error("Stream should have atleast {0} size")]
Size(String),

#[error("While generating times for 'now' failed to parse duration")]
Expand Down