diff --git a/packages/common/gasoline/core/src/activity.rs b/packages/common/gasoline/core/src/activity.rs index 63d6eb7a28..966056491f 100644 --- a/packages/common/gasoline/core/src/activity.rs +++ b/packages/common/gasoline/core/src/activity.rs @@ -12,6 +12,7 @@ pub trait Activity { type Output: Serialize + DeserializeOwned + Debug + Send; const NAME: &'static str; + /// Seconds. const MAX_RETRIES: usize; const TIMEOUT: std::time::Duration; diff --git a/packages/common/gasoline/core/src/ctx/workflow.rs b/packages/common/gasoline/core/src/ctx/workflow.rs index e35f3fc46f..ed0b1b066e 100644 --- a/packages/common/gasoline/core/src/ctx/workflow.rs +++ b/packages/common/gasoline/core/src/ctx/workflow.rs @@ -612,7 +612,7 @@ impl WorkflowCtx { // errors let err = match err { WorkflowError::ActivityFailure(err, _) => { - if error_count + 1 >= I::Activity::MAX_RETRIES { + if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES { WorkflowError::ActivityMaxFailuresReached(err) } else { // Add error count to the error for backoff calculation @@ -620,7 +620,7 @@ impl WorkflowCtx { } } WorkflowError::ActivityTimeout(_) => { - if error_count + 1 >= I::Activity::MAX_RETRIES { + if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES { WorkflowError::ActivityMaxFailuresReached(err.into()) } else { // Add error count to the error for backoff calculation @@ -628,7 +628,7 @@ impl WorkflowCtx { } } WorkflowError::OperationTimeout(_) => { - if error_count + 1 >= I::Activity::MAX_RETRIES { + if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES { WorkflowError::ActivityMaxFailuresReached(err.into()) } else { // Add error count to the error for backoff calculation diff --git a/packages/common/gasoline/macros/src/lib.rs b/packages/common/gasoline/macros/src/lib.rs index 9bc8f3525e..5f05f1af41 100644 --- a/packages/common/gasoline/macros/src/lib.rs +++ b/packages/common/gasoline/macros/src/lib.rs @@ -7,17 +7,25 @@ use syn::{ spanned::Spanned, }; +#[derive(Default)] struct Config { - max_retries: usize, - timeout: u64, + max_retries: Option, + timeout: Option, } -impl Default for Config { - fn default() -> Self { - Config { - max_retries: 5, - timeout: 30, - } +impl Config { + fn max_retries(&self) -> proc_macro2::TokenStream { + self.max_retries + .as_ref() + .map(|e| e.to_token_stream()) + .unwrap_or_else(|| quote! { 5 }) + } + + fn timeout(&self) -> proc_macro2::TokenStream { + self.timeout + .as_ref() + .map(|e| e.to_token_stream()) + .unwrap_or_else(|| quote! { 30 }) } } @@ -130,8 +138,8 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { let fn_body = item_fn.block; let vis = item_fn.vis; - let max_retries = config.max_retries; - let timeout = config.timeout; + let max_retries = config.max_retries(); + let timeout = config.timeout(); let expanded = quote! { #vis struct #struct_ident; @@ -201,7 +209,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { let fn_body = item_fn.block; let vis = item_fn.vis; - let timeout = config.timeout; + let timeout = config.timeout(); let expanded = quote! { #vis struct #struct_ident; @@ -441,12 +449,13 @@ fn parse_config(attrs: &[syn::Attribute]) -> syn::Result { // Verify config property if ident == "max_retries" { - config.max_retries = - syn::parse::(name_value.value.to_token_stream().into())? - .base10_parse()?; + config.max_retries = Some(syn::parse::( + name_value.value.to_token_stream().into(), + )?); } else if ident == "timeout" { - config.timeout = syn::parse::(name_value.value.to_token_stream().into())? - .base10_parse()?; + config.timeout = Some(syn::parse::( + name_value.value.to_token_stream().into(), + )?); } else if ident != "doc" { return Err(syn::Error::new( name_value.span(), diff --git a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs index c82ee4ac2e..ad11379eaa 100644 --- a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs +++ b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs @@ -135,6 +135,7 @@ pub struct HealthCheckNewReplicasInput { } #[activity(HealthCheckNewReplicas)] +#[max_retries = usize::MAX] pub async fn health_check_new_replicas( ctx: &ActivityCtx, input: &HealthCheckNewReplicasInput, diff --git a/packages/services/epoxy/src/workflows/replica/setup.rs b/packages/services/epoxy/src/workflows/replica/setup.rs index 662a1f0524..c036cef4b9 100644 --- a/packages/services/epoxy/src/workflows/replica/setup.rs +++ b/packages/services/epoxy/src/workflows/replica/setup.rs @@ -740,6 +740,7 @@ pub struct NotifyActiveInput { } #[activity(NotifyActive)] +#[max_retries = usize::MAX] pub async fn notify_active(ctx: &ActivityCtx, input: &NotifyActiveInput) -> Result<()> { let config = &input.learning_config; let proto_config: protocol::ClusterConfig = config.clone().into();