Skip to content

Commit 087ee94

Browse files
MasterPtatoNathanFlurry
authored andcommitted
fix: make epoxy activities retry forever
1 parent ac86e79 commit 087ee94

File tree

5 files changed

+31
-19
lines changed

5 files changed

+31
-19
lines changed

packages/common/gasoline/core/src/activity.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub trait Activity {
1212
type Output: Serialize + DeserializeOwned + Debug + Send;
1313

1414
const NAME: &'static str;
15+
/// Seconds.
1516
const MAX_RETRIES: usize;
1617
const TIMEOUT: std::time::Duration;
1718

packages/common/gasoline/core/src/ctx/workflow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -612,23 +612,23 @@ impl WorkflowCtx {
612612
// errors
613613
let err = match err {
614614
WorkflowError::ActivityFailure(err, _) => {
615-
if error_count + 1 >= I::Activity::MAX_RETRIES {
615+
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
616616
WorkflowError::ActivityMaxFailuresReached(err)
617617
} else {
618618
// Add error count to the error for backoff calculation
619619
WorkflowError::ActivityFailure(err, error_count)
620620
}
621621
}
622622
WorkflowError::ActivityTimeout(_) => {
623-
if error_count + 1 >= I::Activity::MAX_RETRIES {
623+
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
624624
WorkflowError::ActivityMaxFailuresReached(err.into())
625625
} else {
626626
// Add error count to the error for backoff calculation
627627
WorkflowError::ActivityTimeout(error_count)
628628
}
629629
}
630630
WorkflowError::OperationTimeout(_) => {
631-
if error_count + 1 >= I::Activity::MAX_RETRIES {
631+
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
632632
WorkflowError::ActivityMaxFailuresReached(err.into())
633633
} else {
634634
// Add error count to the error for backoff calculation

packages/common/gasoline/macros/src/lib.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,25 @@ use syn::{
77
spanned::Spanned,
88
};
99

10+
#[derive(Default)]
1011
struct Config {
11-
max_retries: usize,
12-
timeout: u64,
12+
max_retries: Option<syn::Expr>,
13+
timeout: Option<syn::Expr>,
1314
}
1415

15-
impl Default for Config {
16-
fn default() -> Self {
17-
Config {
18-
max_retries: 5,
19-
timeout: 30,
20-
}
16+
impl Config {
17+
fn max_retries(&self) -> proc_macro2::TokenStream {
18+
self.max_retries
19+
.as_ref()
20+
.map(|e| e.to_token_stream())
21+
.unwrap_or_else(|| quote! { 5 })
22+
}
23+
24+
fn timeout(&self) -> proc_macro2::TokenStream {
25+
self.timeout
26+
.as_ref()
27+
.map(|e| e.to_token_stream())
28+
.unwrap_or_else(|| quote! { 30 })
2129
}
2230
}
2331

@@ -130,8 +138,8 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream {
130138
let fn_body = item_fn.block;
131139
let vis = item_fn.vis;
132140

133-
let max_retries = config.max_retries;
134-
let timeout = config.timeout;
141+
let max_retries = config.max_retries();
142+
let timeout = config.timeout();
135143

136144
let expanded = quote! {
137145
#vis struct #struct_ident;
@@ -201,7 +209,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream {
201209
let fn_body = item_fn.block;
202210
let vis = item_fn.vis;
203211

204-
let timeout = config.timeout;
212+
let timeout = config.timeout();
205213

206214
let expanded = quote! {
207215
#vis struct #struct_ident;
@@ -441,12 +449,13 @@ fn parse_config(attrs: &[syn::Attribute]) -> syn::Result<Config> {
441449

442450
// Verify config property
443451
if ident == "max_retries" {
444-
config.max_retries =
445-
syn::parse::<syn::LitInt>(name_value.value.to_token_stream().into())?
446-
.base10_parse()?;
452+
config.max_retries = Some(syn::parse::<syn::Expr>(
453+
name_value.value.to_token_stream().into(),
454+
)?);
447455
} else if ident == "timeout" {
448-
config.timeout = syn::parse::<syn::LitInt>(name_value.value.to_token_stream().into())?
449-
.base10_parse()?;
456+
config.timeout = Some(syn::parse::<syn::Expr>(
457+
name_value.value.to_token_stream().into(),
458+
)?);
450459
} else if ident != "doc" {
451460
return Err(syn::Error::new(
452461
name_value.span(),

packages/services/epoxy/src/workflows/coordinator/reconfigure.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ pub struct HealthCheckNewReplicasInput {
135135
}
136136

137137
#[activity(HealthCheckNewReplicas)]
138+
#[max_retries = usize::MAX]
138139
pub async fn health_check_new_replicas(
139140
ctx: &ActivityCtx,
140141
input: &HealthCheckNewReplicasInput,

packages/services/epoxy/src/workflows/replica/setup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ pub struct NotifyActiveInput {
740740
}
741741

742742
#[activity(NotifyActive)]
743+
#[max_retries = usize::MAX]
743744
pub async fn notify_active(ctx: &ActivityCtx, input: &NotifyActiveInput) -> Result<()> {
744745
let config = &input.learning_config;
745746
let proto_config: protocol::ClusterConfig = config.clone().into();

0 commit comments

Comments
 (0)