Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/common/gasoline/core/src/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub trait Activity {
type Output: Serialize + DeserializeOwned + Debug + Send;

const NAME: &'static str;
/// Seconds.
const MAX_RETRIES: usize;
const TIMEOUT: std::time::Duration;

Expand Down
6 changes: 3 additions & 3 deletions packages/common/gasoline/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@
impl WorkflowCtx {
/// Creates a sub workflow builder.
pub fn workflow<I>(
&mut self,

Check warning on line 556 in packages/common/gasoline/core/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Test

hiding a lifetime that's elided elsewhere is confusing
input: impl WorkflowRepr<I>,
) -> builder::sub_workflow::SubWorkflowBuilder<impl WorkflowRepr<I>, I>
where
Expand Down Expand Up @@ -612,23 +612,23 @@
// errors
let err = match err {
WorkflowError::ActivityFailure(err, _) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err)
} else {
// Add error count to the error for backoff calculation
WorkflowError::ActivityFailure(err, error_count)
}
}
WorkflowError::ActivityTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err.into())
} else {
// Add error count to the error for backoff calculation
WorkflowError::ActivityTimeout(error_count)
}
}
WorkflowError::OperationTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err.into())
} else {
// Add error count to the error for backoff calculation
Expand Down
41 changes: 25 additions & 16 deletions packages/common/gasoline/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ use syn::{
spanned::Spanned,
};

#[derive(Default)]
struct Config {
max_retries: usize,
timeout: u64,
max_retries: Option<syn::Expr>,
timeout: Option<syn::Expr>,
}

impl Default for Config {
fn default() -> Self {
Config {
max_retries: 5,
timeout: 30,
}
impl Config {
fn max_retries(&self) -> proc_macro2::TokenStream {
self.max_retries
.as_ref()
.map(|e| e.to_token_stream())
.unwrap_or_else(|| quote! { 5 })
}

fn timeout(&self) -> proc_macro2::TokenStream {
self.timeout
.as_ref()
.map(|e| e.to_token_stream())
.unwrap_or_else(|| quote! { 30 })
}
}

Expand Down Expand Up @@ -130,8 +138,8 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream {
let fn_body = item_fn.block;
let vis = item_fn.vis;

let max_retries = config.max_retries;
let timeout = config.timeout;
let max_retries = config.max_retries();
let timeout = config.timeout();

let expanded = quote! {
#vis struct #struct_ident;
Expand Down Expand Up @@ -201,7 +209,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream {
let fn_body = item_fn.block;
let vis = item_fn.vis;

let timeout = config.timeout;
let timeout = config.timeout();

let expanded = quote! {
#vis struct #struct_ident;
Expand Down Expand Up @@ -441,12 +449,13 @@ fn parse_config(attrs: &[syn::Attribute]) -> syn::Result<Config> {

// Verify config property
if ident == "max_retries" {
config.max_retries =
syn::parse::<syn::LitInt>(name_value.value.to_token_stream().into())?
.base10_parse()?;
config.max_retries = Some(syn::parse::<syn::Expr>(
name_value.value.to_token_stream().into(),
)?);
} else if ident == "timeout" {
config.timeout = syn::parse::<syn::LitInt>(name_value.value.to_token_stream().into())?
.base10_parse()?;
config.timeout = Some(syn::parse::<syn::Expr>(
name_value.value.to_token_stream().into(),
)?);
} else if ident != "doc" {
return Err(syn::Error::new(
name_value.span(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub struct HealthCheckNewReplicasInput {
}

#[activity(HealthCheckNewReplicas)]
#[max_retries = usize::MAX]
pub async fn health_check_new_replicas(
ctx: &ActivityCtx,
input: &HealthCheckNewReplicasInput,
Expand Down
1 change: 1 addition & 0 deletions packages/services/epoxy/src/workflows/replica/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ pub struct NotifyActiveInput {
}

#[activity(NotifyActive)]
#[max_retries = usize::MAX]
pub async fn notify_active(ctx: &ActivityCtx, input: &NotifyActiveInput) -> Result<()> {
let config = &input.learning_config;
let proto_config: protocol::ClusterConfig = config.clone().into();
Expand Down
Loading