From 6c50ea62881b6213682fd7347fc910246025f9ea Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 20 May 2025 15:24:52 +0530 Subject: [PATCH 1/2] feat(types): Added LimitsConfig field to ComponentProperties for granular per-component resource control Signed-off-by: Aditya --- crates/wadm-types/src/bindings.rs | 2 ++ crates/wadm-types/src/lib.rs | 18 ++++++++++++++++++ crates/wadm-types/wit/deps/wadm/types.wit | 11 +++++++++++ 3 files changed, 31 insertions(+) diff --git a/crates/wadm-types/src/bindings.rs b/crates/wadm-types/src/bindings.rs index 037c09dd..9b8c97ba 100644 --- a/crates/wadm-types/src/bindings.rs +++ b/crates/wadm-types/src/bindings.rs @@ -113,6 +113,7 @@ impl From for wadm::types::ComponentProperties { id: properties.id, config: properties.config.into_iter().map(|c| c.into()).collect(), secrets: properties.secrets.into_iter().map(|c| c.into()).collect(), + limits: properties.limits.into_iter().map(|c| c.into()).collect(), } } } @@ -431,6 +432,7 @@ impl From for ComponentProperties { id: properties.id, config: properties.config.into_iter().map(|c| c.into()).collect(), secrets: properties.secrets.into_iter().map(|c| c.into()).collect(), + limits: properties.limits.map(Into::into), } } } diff --git a/crates/wadm-types/src/lib.rs b/crates/wadm-types/src/lib.rs index 11377bf0..092e06b6 100644 --- a/crates/wadm-types/src/lib.rs +++ b/crates/wadm-types/src/lib.rs @@ -298,6 +298,9 @@ pub struct ComponentProperties { /// these values at runtime using `wasmcloud:secrets/store`. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub secrets: Vec, + /// This Config holds the component's metadata properties like memory limits and execution time limits + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub limits: Vec, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default, ToSchema, JsonSchema)] @@ -333,6 +336,20 @@ pub struct SecretSourceProperty { pub version: Option, } +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash, ToSchema, JsonSchema)] +pub struct LimitProperties { + #[serde(skip_serializing_if = "Option::is_none")] + pub max_linear_memory: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_execution_time: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash, ToSchema, JsonSchema)] +pub struct LimitsConfig { + pub name: String, + pub properties: LimitProperties, +} + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, ToSchema, JsonSchema)] #[serde(deny_unknown_fields)] pub struct CapabilityProperties { @@ -845,6 +862,7 @@ mod test { id: None, config: vec![], secrets: vec![], + limits: vec![], }, }, traits: Some(trait_vec), diff --git a/crates/wadm-types/wit/deps/wadm/types.wit b/crates/wadm-types/wit/deps/wadm/types.wit index ef50ad68..9a2560fd 100644 --- a/crates/wadm-types/wit/deps/wadm/types.wit +++ b/crates/wadm-types/wit/deps/wadm/types.wit @@ -123,6 +123,17 @@ interface types { id: option, config: list, secrets: list, + limits: list, + } + + record limits-config { + name: string, + properties: limit-properties, + } + + record limit-properties { + max-linear-memory: option, + max-execution-time: option, } // Properties for a capability From fdeee05d43e4ca7a79b5d0141bb0cf443e6d3d1a Mon Sep 17 00:00:00 2001 From: Aditya Date: Fri, 11 Jul 2025 02:58:26 +0530 Subject: [PATCH 2/2] feat(wadm): Add Component resource limits support - Modify LimitsConfig to wadm-types for uni-config scenarios only - Update ScaleComponent command to include limits field - Add compute_limits function in scaler/convert.rs using parse-size crate for validation - Support human-readable formats like 1GB, 512MB, 30s for manifest configuration~ Signed-off-by: Aditya --- Cargo.lock | 18 +++-- crates/wadm-types/src/lib.rs | 6 +- crates/wadm/Cargo.toml | 2 + crates/wadm/src/commands/mod.rs | 4 + crates/wadm/src/events/types.rs | 1 + crates/wadm/src/scaler/configscaler.rs | 3 +- crates/wadm/src/scaler/convert.rs | 84 ++++++++++++++++++++- crates/wadm/src/scaler/daemonscaler/mod.rs | 25 +++++- crates/wadm/src/scaler/spreadscaler/link.rs | 1 + crates/wadm/src/scaler/spreadscaler/mod.rs | 46 +++++++++-- crates/wadm/src/storage/reaper.rs | 5 ++ crates/wadm/src/storage/state.rs | 3 + crates/wadm/src/workers/command.rs | 2 +- crates/wadm/src/workers/event.rs | 14 +++- tests/command_consumer_integration.rs | 2 + tests/command_worker_integration.rs | 3 + tests/storage_nats_kv.rs | 6 ++ 17 files changed, 206 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 462930d5..54bd454d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -753,7 +753,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1903,6 +1903,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "parse-size" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b" + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -2147,7 +2153,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2412,7 +2418,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2425,7 +2431,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.9.3", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3036,7 +3042,7 @@ dependencies = [ "getrandom 0.3.2", "once_cell", "rustix 1.0.3", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3699,10 +3705,12 @@ dependencies = [ "futures", "http 1.3.1", "http-body-util", + "humantime", "hyper 1.6.0", "hyper-util", "indexmap 2.9.0", "nkeys", + "parse-size", "semver", "serde", "serde_json", diff --git a/crates/wadm-types/src/lib.rs b/crates/wadm-types/src/lib.rs index 092e06b6..b6119ca6 100644 --- a/crates/wadm-types/src/lib.rs +++ b/crates/wadm-types/src/lib.rs @@ -299,8 +299,8 @@ pub struct ComponentProperties { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub secrets: Vec, /// This Config holds the component's metadata properties like memory limits and execution time limits - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub limits: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub limits: Option, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default, ToSchema, JsonSchema)] @@ -862,7 +862,7 @@ mod test { id: None, config: vec![], secrets: vec![], - limits: vec![], + limits: None, }, }, traits: Some(trait_vec), diff --git a/crates/wadm/Cargo.toml b/crates/wadm/Cargo.toml index bdf52fee..13cacc92 100644 --- a/crates/wadm/Cargo.toml +++ b/crates/wadm/Cargo.toml @@ -46,6 +46,8 @@ uuid = { workspace = true } wadm-types = { workspace = true } wasmcloud-control-interface = { workspace = true } wasmcloud-secrets-types = { workspace = true } +humantime = "2.2.0" +parse-size = "1.1.0" [dev-dependencies] serial_test = "3" diff --git a/crates/wadm/src/commands/mod.rs b/crates/wadm/src/commands/mod.rs index e8a4ac21..4bed8078 100644 --- a/crates/wadm/src/commands/mod.rs +++ b/crates/wadm/src/commands/mod.rs @@ -78,6 +78,7 @@ impl Command { component_id, host_id, count, + limits, reference, annotations, model_name, @@ -94,6 +95,7 @@ impl Command { annotations: annotations.to_owned(), // We don't know this field from the command claims: None, + limits: limits.clone(), }), Some(Event::ComponentScaleFailed(ComponentScaleFailed { component_id: component_id.to_owned(), @@ -122,6 +124,8 @@ pub struct ScaleComponent { pub host_id: String, /// The number of components to scale to pub count: u32, + /// The limits for the component, if any + pub limits: Option>, /// The OCI or bindle reference to scale pub reference: String, /// The name of the model/manifest that generated this command diff --git a/crates/wadm/src/events/types.rs b/crates/wadm/src/events/types.rs index a06c70d6..d2fa7902 100644 --- a/crates/wadm/src/events/types.rs +++ b/crates/wadm/src/events/types.rs @@ -300,6 +300,7 @@ pub struct ComponentScaled { pub claims: Option, pub image_ref: String, pub max_instances: usize, + pub limits: Option>, pub component_id: String, #[serde(default)] pub host_id: String, diff --git a/crates/wadm/src/scaler/configscaler.rs b/crates/wadm/src/scaler/configscaler.rs index 245757c6..b11c7929 100644 --- a/crates/wadm/src/scaler/configscaler.rs +++ b/crates/wadm/src/scaler/configscaler.rs @@ -279,7 +279,8 @@ mod test { image_ref: "foo".to_string(), max_instances: 0, component_id: "fooo".to_string(), - host_id: "hostid".to_string() + host_id: "hostid".to_string(), + limits: None, })) .await .expect("handle_event should succeed"), diff --git a/crates/wadm/src/scaler/convert.rs b/crates/wadm/src/scaler/convert.rs index d57fe314..629ba61c 100644 --- a/crates/wadm/src/scaler/convert.rs +++ b/crates/wadm/src/scaler/convert.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Result; +use parse_size::parse_size; use tracing::{error, warn}; use wadm_types::{ api::StatusInfo, CapabilityProperties, Component, ComponentProperties, ConfigProperty, @@ -195,7 +196,14 @@ fn component_scalers( &properties.secrets, policies, ); - + // write a function call here that converts the limits from componentProperties and unmarshalls them into the correct format + // It first checks if the limts are set in their correct foormat, ie humantime for max_execution_time and human readable memory + // for max_memory_limits like "4gb" or so + // if successful, they are set in a hashmap with the keys "max_execution_time" and "max_memory_limits" and then passed to the scalers. + let limits = properties + .limits + .as_ref() + .and_then(|_| compute_limits(properties)); config_names.append(&mut secret_names.clone()); // TODO(#451): Consider a way to report on status of a shared component match (trt.trait_type.as_str(), &trt.properties, &properties.image) { @@ -226,6 +234,7 @@ fn component_scalers( p.to_owned(), component_name, config_names, + limits, ), notifier.clone(), config_scalers, @@ -246,6 +255,7 @@ fn component_scalers( p.to_owned(), component_name, config_names, + limits, ), notifier.clone(), config_scalers, @@ -741,6 +751,78 @@ fn resolve_manifest_component<'a>( } } +// add new function to compute the limits from componentProperties and unmarshall them into the correct format +// It first checks if the limits are set in their correct format, ie humantime for max_execution_time and human readable memory +/// Parse and validate memory size from human-readable format to bytes +fn parse_and_validate_memory_size(input: &str) -> Result { + let bytes = parse_size(input).map_err(|e| format!("Parse error: {}", e))?; + + if bytes > usize::MAX as u64 { + Err(format!( + "Size too large: max allowed is {} bytes (got {} bytes)", + usize::MAX, + bytes + )) + } else { + Ok(bytes as usize) + } +} + +/// Parse execution time from humantime format to seconds +fn parse_and_validate_execution_time(input: &str) -> Result { + humantime::parse_duration(input) + .map(|duration| duration.as_secs()) + .map_err(|e| format!("Invalid time format: {}", e)) +} + +/// Computes limits for components from ComponentProperties +/// Converts human-readable formats to numeric values for the control interface +pub(crate) fn compute_limits(properties: &ComponentProperties) -> Option> { + let mut limits = HashMap::new(); + + if let Some(max_execution_time) = properties + .limits + .as_ref() + .and_then(|l| l.properties.max_execution_time.as_ref()) + { + match parse_and_validate_execution_time(max_execution_time) { + Ok(seconds) => { + limits.insert("max_execution_time".to_string(), seconds.to_string()); + } + Err(e) => { + warn!( + "Invalid max_execution_time format '{}': {}", + max_execution_time, e + ); + } + } + } + + if let Some(max_memory_limits) = &properties + .limits + .as_ref() + .and_then(|l| l.properties.max_linear_memory.as_ref()) + { + match parse_and_validate_memory_size(max_memory_limits) { + Ok(bytes) => { + limits.insert("max_memory_limit".to_string(), bytes.to_string()); + } + Err(e) => { + warn!( + "Invalid max_memory_limits format '{}': {}", + max_memory_limits, e + ); + } + } + } + + if limits.is_empty() { + None + } else { + Some(limits) + } +} + #[cfg(test)] mod test { use super::compute_component_id; diff --git a/crates/wadm/src/scaler/daemonscaler/mod.rs b/crates/wadm/src/scaler/daemonscaler/mod.rs index 0536c5a8..2e4742e9 100644 --- a/crates/wadm/src/scaler/daemonscaler/mod.rs +++ b/crates/wadm/src/scaler/daemonscaler/mod.rs @@ -1,5 +1,5 @@ use std::cmp::Ordering; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use anyhow::Result; use async_trait::async_trait; @@ -38,6 +38,8 @@ struct ComponentSpreadConfig { model_name: String, /// Configuration for this DaemonScaler spread_config: SpreadScalerProperty, + /// The limits for the respecctive component, if any + limits: Option>, } /// The ComponentDaemonScaler ensures that a certain number of instances are running on every host, according to a @@ -159,6 +161,7 @@ impl Scaler for ComponentDaemonScaler { model_name: self.spread_config.model_name.to_owned(), annotations: BTreeMap::new(), config: self.config.clone(), + limits: self.spread_config.limits.clone(), })) } else { None @@ -242,6 +245,7 @@ impl Scaler for ComponentDaemonScaler { self.id(), ), config: self.config.clone(), + limits: self.spread_config.limits.clone(), })) } } @@ -313,6 +317,7 @@ impl ComponentDaemonScaler { spread_config: SpreadScalerProperty, component_name: &str, config: Vec, + limits: Option>, ) -> Self { // Compute the id of this scaler based on all of the configuration values // that make it unique. This is used during upgrades to determine if a @@ -344,6 +349,7 @@ impl ComponentDaemonScaler { lattice_id, spread_config, model_name, + limits, }, id, status: RwLock::new(StatusInfo::reconciling("")), @@ -441,6 +447,7 @@ mod test { complex_spread, "fake_component", vec![], + None, ); let cmds = daemonscaler.reconcile().await?; @@ -453,6 +460,7 @@ mod test { model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("ComplexOne", daemonscaler.id()), config: vec![], + limits: None, }))); assert!(cmds.contains(&Command::ScaleComponent(ScaleComponent { component_id: component_id.to_string(), @@ -462,6 +470,7 @@ mod test { model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("ComplexTwo", daemonscaler.id()), config: vec![], + limits: None, }))); assert!(cmds.contains(&Command::ScaleComponent(ScaleComponent { component_id: component_id.to_string(), @@ -471,6 +480,7 @@ mod test { model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("ComplexThree", daemonscaler.id()), config: vec![], + limits: None, }))); assert!(cmds.contains(&Command::ScaleComponent(ScaleComponent { component_id: component_id.to_string(), @@ -480,6 +490,7 @@ mod test { model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("ComplexFour", daemonscaler.id()), config: vec![], + limits: None, }))); Ok(()) @@ -562,6 +573,7 @@ mod test { echo_spread_property, "fake_echo", vec![], + None, ); let blobby_daemonscaler = ComponentDaemonScaler::new( @@ -573,6 +585,7 @@ mod test { blobby_spread_property, "fake_blobby", vec![], + None, ); // STATE SETUP BEGIN @@ -595,6 +608,7 @@ mod test { "RunInFakeCloud", echo_daemonscaler.id(), ), + limits: None, }]), ), ( @@ -606,6 +620,7 @@ mod test { "RunInRealCloud", echo_daemonscaler.id(), ), + limits: None, }]), ), ( @@ -617,6 +632,7 @@ mod test { "RunInPurgatoryCloud", echo_daemonscaler.id(), ), + limits: None, }]), ), ]), @@ -643,6 +659,7 @@ mod test { "CrossRegionCustom", blobby_daemonscaler.id(), ), + limits: None, }]), ), ( @@ -654,6 +671,7 @@ mod test { "CrossRegionReal", blobby_daemonscaler.id(), ), + limits: None, }]), ), ]), @@ -849,6 +867,7 @@ mod test { blobby_spread_property, "fake_blobby", vec![], + None, ); // STATE SETUP BEGIN @@ -870,6 +889,7 @@ mod test { "HighAvailability", blobby_daemonscaler.id(), ), + limits: None, }]), ), ( @@ -881,6 +901,7 @@ mod test { "HighAvailability", blobby_daemonscaler.id(), ), + limits: None, }]), ), ( @@ -1042,6 +1063,7 @@ mod test { "HighAvailability", blobby_daemonscaler.id(), ), + limits: None, }]), ), ( @@ -1053,6 +1075,7 @@ mod test { "HighAvailability", blobby_daemonscaler.id(), ), + limits: None, }]), ), ]), diff --git a/crates/wadm/src/scaler/spreadscaler/link.rs b/crates/wadm/src/scaler/spreadscaler/link.rs index c4f10b1a..fdaae353 100644 --- a/crates/wadm/src/scaler/spreadscaler/link.rs +++ b/crates/wadm/src/scaler/spreadscaler/link.rs @@ -574,6 +574,7 @@ mod test { component_id: echo_id.to_string(), max_instances: 1, host_id: host_id_one.to_string(), + limits: None, })) .await .expect("should be able to handle components started event"); diff --git a/crates/wadm/src/scaler/spreadscaler/mod.rs b/crates/wadm/src/scaler/spreadscaler/mod.rs index b43b2fad..b07776e8 100644 --- a/crates/wadm/src/scaler/spreadscaler/mod.rs +++ b/crates/wadm/src/scaler/spreadscaler/mod.rs @@ -42,6 +42,8 @@ struct ComponentSpreadConfig { model_name: String, /// Configuration for this SpreadScaler spread_config: SpreadScalerProperty, + /// The limits for the respecctive component, if any + limits: Option>, } /// The ComponentSpreadScaler ensures that a certain number of instances are running, @@ -156,6 +158,7 @@ impl Scaler for ComponentSpreadScaler { model_name: self.spread_config.model_name.to_owned(), annotations: BTreeMap::new(), config: self.config.clone(), + limits: self.spread_config.limits.clone(), // do i really have to do this here. })) } else { None @@ -205,7 +208,7 @@ impl Scaler for ComponentSpreadScaler { .map(|v| v == value) .unwrap_or(false) }, - ).then_some(info.count) + ).then_some(info.count) // i want to extract the limits from wadmcomponentinfo }) .sum(); (count > 0).then_some((host_id, count)) @@ -237,6 +240,7 @@ impl Scaler for ComponentSpreadScaler { model_name: self.spread_config.model_name.to_owned(), annotations: spreadscaler_annotations(&spread.name, self.id()), config: self.config.clone(), + limits: self.spread_config.limits.clone(), })]) } // Stop components to reach desired instances @@ -261,6 +265,7 @@ impl Scaler for ComponentSpreadScaler { model_name: self.spread_config.model_name.to_owned(), annotations: spreadscaler_annotations(&spread.name, self.id()), config: self.config.clone(), + limits: self.spread_config.limits.clone(), })); } (current_stopped, commands) @@ -346,6 +351,7 @@ impl ComponentSpreadScaler { spread_config: SpreadScalerProperty, component_name: &str, config: Vec, + limits: Option>, ) -> Self { // Compute the id of this scaler based on all of the configuration values // that make it unique. This is used during upgrades to determine if a @@ -369,6 +375,7 @@ impl ComponentSpreadScaler { lattice_id, spread_config, model_name, + limits, }, id, config, @@ -852,6 +859,7 @@ mod test { spread_config, "fake_component", vec![], + None, // No limits for this test ); let cmds = spreadscaler.reconcile().await?; @@ -869,7 +877,8 @@ mod test { count: 53, model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("EastZone", spreadscaler.id()), - config: vec![] + config: vec![], + limits: None, }))); assert!(cmds.contains(&Command::ScaleComponent(ScaleComponent { @@ -879,7 +888,8 @@ mod test { count: 3, model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("WestZone", spreadscaler.id()), - config: vec![] + config: vec![], + limits: None, }))); assert!(cmds.contains(&Command::ScaleComponent(ScaleComponent { @@ -889,7 +899,8 @@ mod test { count: 47, model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("CentralZone", spreadscaler.id()), - config: vec![] + config: vec![], + limits: None, }))); Ok(()) @@ -972,6 +983,7 @@ mod test { echo_spread_property, "fake_echo", vec![], + None, // No limits for this test ); let blobby_spreadscaler = ComponentSpreadScaler::new( @@ -983,6 +995,7 @@ mod test { blobby_spread_property, "fake_blobby", vec![], + None, // No limits for this test ); // STATE SETUP BEGIN @@ -1005,6 +1018,7 @@ mod test { echo_spreadscaler.id(), ), count: 1, + limits: None, }]), ), ( @@ -1016,6 +1030,7 @@ mod test { echo_spreadscaler.id(), ), count: 103, + limits: None, }]), ), ( @@ -1027,6 +1042,7 @@ mod test { echo_spreadscaler.id(), ), count: 400, + limits: None, }]), ), ]), @@ -1053,6 +1069,7 @@ mod test { "CrossRegionCustom", blobby_spreadscaler.id(), ), + limits: None, }]), ), ( @@ -1064,6 +1081,7 @@ mod test { "CrossRegionReal", blobby_spreadscaler.id(), ), + limits: None, }]), ), ]), @@ -1218,6 +1236,7 @@ mod test { real_spread, "fake_component", vec![], + None, // No limits for this test ); // STATE SETUP BEGIN, ONE HOST @@ -1271,6 +1290,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { count: 10, annotations: spreadscaler_annotations("default", spreadscaler.id()), + limits: None, }]), ), ( @@ -1278,6 +1298,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { count: 10, annotations: spreadscaler_annotations("default", spreadscaler.id()), + limits: None, }]), ), ]), @@ -1322,7 +1343,8 @@ mod test { count: 0, model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("default", spreadscaler.id()), - config: vec![] + config: vec![], + limits: None, }))); assert!(cmds.contains(&Command::ScaleComponent(ScaleComponent { component_id: component_id.clone(), @@ -1331,7 +1353,8 @@ mod test { count: 0, model_name: MODEL_NAME.to_string(), annotations: spreadscaler_annotations("default", spreadscaler.id()), - config: vec![] + config: vec![], + limits: None, }))); Ok(()) } @@ -1405,6 +1428,7 @@ mod test { blobby_spread_property, "fake_blobby", vec![], + None, ); // STATE SETUP BEGIN @@ -1426,6 +1450,7 @@ mod test { "CrossRegionCustom", blobby_spreadscaler.id(), ), + limits: None, }]), ), ( @@ -1437,6 +1462,7 @@ mod test { "CrossRegionReal", blobby_spreadscaler.id(), ), + limits: None, }]), ), ]), @@ -1577,6 +1603,7 @@ mod test { host_id: host_id_two.to_string(), max_instances: 0, claims: None, + limits: None, }; worker @@ -1835,6 +1862,7 @@ mod test { spread_property, &component_name, vec![], + None, ); spreadscaler.reconcile().await?; @@ -1895,6 +1923,7 @@ mod test { spread_property, component_name, vec![], + None, // No limits for this test ); // Create components with the specified labels and add them to the store @@ -1917,6 +1946,7 @@ mod test { "realcloud", spreadscaler.id(), ), + limits: None, }, WadmComponentInfo { count: 11, @@ -1924,6 +1954,7 @@ mod test { "eastcoast", spreadscaler.id(), ), + limits: None, }, ]), ), @@ -1942,6 +1973,7 @@ mod test { "realcloud", spreadscaler.id(), ), + limits: None, }, WadmComponentInfo { count: 33, @@ -1949,6 +1981,7 @@ mod test { "westcoast", spreadscaler.id(), ), + limits: None, }, ]), ), @@ -1961,6 +1994,7 @@ mod test { "realcloud", spreadscaler.id(), ), + limits: None, }]), ), ]), diff --git a/crates/wadm/src/storage/reaper.rs b/crates/wadm/src/storage/reaper.rs index 85840005..8c3aa35a 100644 --- a/crates/wadm/src/storage/reaper.rs +++ b/crates/wadm/src/storage/reaper.rs @@ -282,6 +282,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { annotations: BTreeMap::default(), count: 1, + limits: None, }]), ), ( @@ -289,6 +290,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { annotations: BTreeMap::default(), count: 1, + limits: None, }]), ), ]), @@ -304,6 +306,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { annotations: BTreeMap::default(), count: 1, + limits: None, }]), )]), ..Default::default() @@ -406,6 +409,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { annotations: BTreeMap::default(), count: 1, + limits: None, }]), ), ( @@ -413,6 +417,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { annotations: BTreeMap::default(), count: 1, + limits: None, }]), ), ]), diff --git a/crates/wadm/src/storage/state.rs b/crates/wadm/src/storage/state.rs index 1fb43b7d..98c38c0a 100644 --- a/crates/wadm/src/storage/state.rs +++ b/crates/wadm/src/storage/state.rs @@ -109,6 +109,7 @@ impl From<&ProviderStarted> for Provider { pub struct WadmComponentInfo { pub annotations: BTreeMap, pub count: usize, + pub limits: Option>, } impl PartialEq for WadmComponentInfo { @@ -186,6 +187,7 @@ impl From for Component { HashSet::from_iter([WadmComponentInfo { annotations: value.annotations, count: value.max_instances, + limits: value.limits, }]), )]), } @@ -212,6 +214,7 @@ impl From<&ComponentScaled> for Component { HashSet::from_iter([WadmComponentInfo { annotations: value.annotations.clone(), count: value.max_instances, + limits: value.limits.clone(), }]), )]), } diff --git a/crates/wadm/src/workers/command.rs b/crates/wadm/src/workers/command.rs index d89aaaa8..b185f8e0 100644 --- a/crates/wadm/src/workers/command.rs +++ b/crates/wadm/src/workers/command.rs @@ -42,7 +42,7 @@ impl Worker for CommandWorker { &component.component_id, component.count, Some(annotations.into_iter().collect()), - component.config.clone(), + component.config.clone(), // Add the limits field here after final merge of #4451 ) .await } diff --git a/crates/wadm/src/workers/event.rs b/crates/wadm/src/workers/event.rs index 9502570b..85df4527 100644 --- a/crates/wadm/src/workers/event.rs +++ b/crates/wadm/src/workers/event.rs @@ -87,12 +87,14 @@ where // If the component is running and is now scaled down to zero, remove it Some(current_instances) if component.max_instances == 0 => { current_instances.remove(&component.annotations); + // Removing the limits will set the limits to default runtime values } // If a component is already running on a host, update the running count to the scaled max_instances value Some(current_instances) => { current_instances.replace(WadmComponentInfo { count: component.max_instances, annotations: component.annotations.clone(), + limits: component.limits.clone(), }); } // Component is not running and now scaled to zero, no action required. This can happen if we @@ -105,6 +107,7 @@ where HashSet::from([WadmComponentInfo { count: component.max_instances, annotations: component.annotations.clone(), + limits: component.limits.clone(), }]), ); } @@ -481,7 +484,6 @@ where .await .map_err(anyhow::Error::from) } - // END HANDLER FUNCTIONS async fn populate_component_info( &self, @@ -495,11 +497,13 @@ where .into_iter() .map(|component_description| { let instance = HashSet::from_iter([WadmComponentInfo { + // Needs #4451 merged count: component_description.max_instances() as usize, annotations: component_description .annotations() .cloned() .unwrap_or_default(), + limits: None, // Needs #4451 merged for actually setting limits from component description }]); if let Some(component) = components.get(component_description.id()) { // Construct modified Component with new instances included @@ -1161,6 +1165,7 @@ mod test { host_id: host1_id.into(), annotations: BTreeMap::default(), max_instances: 500, + limits: None, }; worker .handle_component_scaled(lattice_id, &component1_scaled) @@ -1182,6 +1187,7 @@ mod test { 500, "Component count should be modified with an increase in scale" ); + // Add assert_eq for the limits check. let component1_scaled = ComponentScaled { claims: Some(ComponentClaims { @@ -1196,6 +1202,7 @@ mod test { host_id: host1_id.into(), annotations: BTreeMap::default(), max_instances: 200, + limits: None, }; worker .handle_component_scaled(lattice_id, &component1_scaled) @@ -1231,6 +1238,7 @@ mod test { host_id: host1_id.into(), annotations: BTreeMap::default(), max_instances: 0, + limits: None, }; worker .handle_component_scaled(lattice_id, &component1_scaled) @@ -1262,6 +1270,7 @@ mod test { host_id: host1_id.into(), annotations: BTreeMap::default(), max_instances: 1, + limits: None, }; worker .handle_component_scaled(lattice_id, &component1_scaled) @@ -1307,6 +1316,7 @@ mod test { component_id: "DARTHVADER".into(), annotations: BTreeMap::default(), max_instances: 2, + limits: None, }; worker @@ -1522,6 +1532,7 @@ mod test { component_id: component_1_id.into(), host_id: host1_id.into(), max_instances: 0, + limits: None, }; worker @@ -2076,6 +2087,7 @@ mod test { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::default(), + limits: None, }]), )]), ..Default::default() diff --git a/tests/command_consumer_integration.rs b/tests/command_consumer_integration.rs index 548b97a1..ffc60b58 100644 --- a/tests/command_consumer_integration.rs +++ b/tests/command_consumer_integration.rs @@ -29,6 +29,7 @@ async fn test_consumer_stream() { model_name: "fake".into(), annotations: BTreeMap::new(), config: vec![], + limits: None, }) .await; wrapper @@ -166,6 +167,7 @@ async fn test_nack_and_rereceive() { model_name: "fake".into(), annotations: BTreeMap::new(), config: vec![], + limits: None, }) .await; diff --git a/tests/command_worker_integration.rs b/tests/command_worker_integration.rs index b62e2d93..0ae195cb 100644 --- a/tests/command_worker_integration.rs +++ b/tests/command_worker_integration.rs @@ -53,6 +53,7 @@ async fn test_commands() { model_name: "fake".into(), annotations: BTreeMap::new(), config: vec![], + limits: None, }) .await; @@ -295,6 +296,7 @@ async fn test_commands() { model_name: "fake".into(), annotations: BTreeMap::new(), config: vec![], + limits: None, }) .await; @@ -433,6 +435,7 @@ async fn test_annotation_stop() { model_name: "fake".into(), annotations: BTreeMap::from_iter([("fake".to_string(), "wake".to_string())]), config: vec![], + limits: None, }) .await; diff --git a/tests/storage_nats_kv.rs b/tests/storage_nats_kv.rs index 69c763fd..52a68d66 100644 --- a/tests/storage_nats_kv.rs +++ b/tests/storage_nats_kv.rs @@ -37,6 +37,7 @@ async fn test_round_trip() { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::new(), + limits: None, }]), )]), reference: "fake.oci.repo/testcomponent:0.1.0".to_string(), @@ -51,6 +52,7 @@ async fn test_round_trip() { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::new(), + limits: None, }]), )]), reference: "fake.oci.repo/anothercomponent:0.1.0".to_string(), @@ -235,6 +237,7 @@ async fn test_multiple_lattice() { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::new(), + limits: None, }]), )]), reference: "fake.oci.repo/testcomponent:0.1.0".to_string(), @@ -249,6 +252,7 @@ async fn test_multiple_lattice() { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::new(), + limits: None, }]), )]), reference: "fake.oci.repo/anothercomponent:0.1.0".to_string(), @@ -322,6 +326,7 @@ async fn test_store_and_delete_many() { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::new(), + limits: None, }]), )]), reference: "fake.oci.repo/testcomponent:0.1.0".to_string(), @@ -336,6 +341,7 @@ async fn test_store_and_delete_many() { HashSet::from_iter([WadmComponentInfo { count: 1, annotations: BTreeMap::new(), + limits: None, }]), )]), reference: "fake.oci.repo/anothercomponent:0.1.0".to_string(),