From 9e323b2efa0257aec209ec77d3d3371de1e38c7a Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 07:28:19 -0400 Subject: [PATCH 1/9] Add failing test --- tests/worker/test_activity.py | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index c981a98c8..463b0c292 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -41,6 +41,7 @@ Worker, WorkerConfig, ) +from tests.helpers import new_worker from tests.helpers.worker import ( ExternalWorker, KSAction, @@ -1351,3 +1352,40 @@ def assert_activity_application_error( ret = assert_activity_error(err) assert isinstance(ret, ApplicationError) return ret + + +@activity.defn +async def activity_with_retry_delay(retry_delay_seconds: float): + if activity.info().attempt == 1: + raise ApplicationError( + "Deliberately failing with next_retry_delay set", + next_retry_delay=timedelta(seconds=retry_delay_seconds), + ) + + +@workflow.defn +class ActivitiesWithRetryDelayWorkflow: + @workflow.run + async def run(self, retry_delay_seconds: float) -> float: + t0 = workflow.time() + await workflow.execute_activity( + activity_with_retry_delay, + retry_delay_seconds, + schedule_to_close_timeout=timedelta(seconds=retry_delay_seconds * 2), + ) + t1 = workflow.time() + return t1 - t0 + + +async def test_activity_retry_delay(client: Client): + retry_delay = timedelta(seconds=2) + async with new_worker( + client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay] + ) as worker: + workflow_duration = await client.execute_workflow( + ActivitiesWithRetryDelayWorkflow.run, + retry_delay.total_seconds(), + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + assert workflow_duration > retry_delay.total_seconds() From 6b08b66c40bbddc5abc104ea7f5b0048134fffa2 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 10:04:41 -0400 Subject: [PATCH 2/9] Feature: support next_retry_delay when raising ApplicationError from an activity --- temporalio/converter.py | 5 +++++ temporalio/exceptions.py | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/temporalio/converter.py b/temporalio/converter.py index d3cb94ce9..887f685a5 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -34,6 +34,7 @@ overload, ) +import google.protobuf.duration_pb2 import google.protobuf.json_format import google.protobuf.message import google.protobuf.symbol_database @@ -843,6 +844,10 @@ def _error_to_failure( failure.application_failure_info.details.CopyFrom( payload_converter.to_payloads_wrapper(error.details) ) + if error.next_retry_delay: + delay = google.protobuf.duration_pb2.Duration() + delay.FromTimedelta(error.next_retry_delay) + failure.application_failure_info.next_retry_delay.CopyFrom(delay) elif isinstance(error, temporalio.exceptions.TimeoutError): failure.timeout_failure_info.SetInParent() failure.timeout_failure_info.timeout_type = ( diff --git a/temporalio/exceptions.py b/temporalio/exceptions.py index d80190ba9..be51eddff 100644 --- a/temporalio/exceptions.py +++ b/temporalio/exceptions.py @@ -1,6 +1,7 @@ """Common Temporal exceptions.""" import asyncio +from datetime import timedelta from enum import IntEnum from typing import Any, Optional, Sequence, Tuple @@ -78,6 +79,7 @@ def __init__( *details: Any, type: Optional[str] = None, non_retryable: bool = False, + next_retry_delay: Optional[timedelta] = None, ) -> None: """Initialize an application error.""" super().__init__( @@ -88,6 +90,7 @@ def __init__( self._details = details self._type = type self._non_retryable = non_retryable + self._next_retry_delay = next_retry_delay @property def details(self) -> Sequence[Any]: @@ -109,6 +112,16 @@ def non_retryable(self) -> bool: """ return self._non_retryable + @property + def next_retry_delay(self) -> Optional[timedelta]: + """Delay before the next activity retry attempt. + + User activity code may set this when raising ApplicationError to specify + a delay before the next activity retry. Ignored if set when raising + ApplicationError from workflow code. + """ + return self._next_retry_delay + class CancelledError(FailureError): """Error raised on workflow/activity cancellation.""" From f7c2ed6214504dcb1c6c7887bb79ea2891fe4782 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 10 Jul 2024 09:39:00 -0400 Subject: [PATCH 3/9] Streamline use of Python proto APIs --- temporalio/converter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/temporalio/converter.py b/temporalio/converter.py index 887f685a5..587098002 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -845,9 +845,9 @@ def _error_to_failure( payload_converter.to_payloads_wrapper(error.details) ) if error.next_retry_delay: - delay = google.protobuf.duration_pb2.Duration() - delay.FromTimedelta(error.next_retry_delay) - failure.application_failure_info.next_retry_delay.CopyFrom(delay) + failure.application_failure_info.next_retry_delay.FromTimedelta( + error.next_retry_delay + ) elif isinstance(error, temporalio.exceptions.TimeoutError): failure.timeout_failure_info.SetInParent() failure.timeout_failure_info.timeout_type = ( From 6eabe6919ea271b98722f65ca1576bd1cfed657f Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 10 Jul 2024 09:46:57 -0400 Subject: [PATCH 4/9] Update doctsring to reflect https://github.com/temporalio/temporal/pull/5946 --- temporalio/exceptions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/temporalio/exceptions.py b/temporalio/exceptions.py index be51eddff..95776623b 100644 --- a/temporalio/exceptions.py +++ b/temporalio/exceptions.py @@ -117,8 +117,7 @@ def next_retry_delay(self) -> Optional[timedelta]: """Delay before the next activity retry attempt. User activity code may set this when raising ApplicationError to specify - a delay before the next activity retry. Ignored if set when raising - ApplicationError from workflow code. + a delay before the next activity retry. """ return self._next_retry_delay From 416fc643d074c92d0789363573a98cc1da634b65 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 18 Aug 2024 22:08:57 -0400 Subject: [PATCH 5/9] Don't wait for retry delay in test --- tests/worker/test_activity.py | 47 ++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 463b0c292..3791d8c40 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1355,37 +1355,44 @@ def assert_activity_application_error( @activity.defn -async def activity_with_retry_delay(retry_delay_seconds: float): - if activity.info().attempt == 1: - raise ApplicationError( - "Deliberately failing with next_retry_delay set", - next_retry_delay=timedelta(seconds=retry_delay_seconds), - ) +async def activity_with_retry_delay(): + raise ApplicationError( + ActivitiesWithRetryDelayWorkflow.error_message, + next_retry_delay=ActivitiesWithRetryDelayWorkflow.next_retry_delay, + ) @workflow.defn class ActivitiesWithRetryDelayWorkflow: + error_message = "Deliberately failing with next_retry_delay set" + next_retry_delay = timedelta(milliseconds=5) + @workflow.run - async def run(self, retry_delay_seconds: float) -> float: - t0 = workflow.time() + async def run(self) -> None: await workflow.execute_activity( activity_with_retry_delay, - retry_delay_seconds, - schedule_to_close_timeout=timedelta(seconds=retry_delay_seconds * 2), + retry_policy=RetryPolicy(maximum_attempts=2), + schedule_to_close_timeout=self.next_retry_delay, ) - t1 = workflow.time() - return t1 - t0 async def test_activity_retry_delay(client: Client): - retry_delay = timedelta(seconds=2) async with new_worker( client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay] ) as worker: - workflow_duration = await client.execute_workflow( - ActivitiesWithRetryDelayWorkflow.run, - retry_delay.total_seconds(), - id=str(uuid.uuid4()), - task_queue=worker.task_queue, - ) - assert workflow_duration > retry_delay.total_seconds() + try: + await client.execute_workflow( + ActivitiesWithRetryDelayWorkflow.run, + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + except WorkflowFailureError as err: + assert isinstance(err.cause, ActivityError) + assert isinstance(err.cause.cause, ApplicationError) + assert ( + str(err.cause.cause) == ActivitiesWithRetryDelayWorkflow.error_message + ) + assert ( + err.cause.cause.next_retry_delay + == ActivitiesWithRetryDelayWorkflow.next_retry_delay + ) From 9994a1def8c65231d6083357b078c410d976a18d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 18 Aug 2024 22:09:04 -0400 Subject: [PATCH 6/9] Add new field to exception converter --- temporalio/converter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/temporalio/converter.py b/temporalio/converter.py index 587098002..24f9b6b47 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -933,6 +933,7 @@ def from_failure( *payload_converter.from_payloads_wrapper(app_info.details), type=app_info.type or None, non_retryable=app_info.non_retryable, + next_retry_delay=app_info.next_retry_delay.ToTimedelta(), ) elif failure.HasField("timeout_failure_info"): timeout_info = failure.timeout_failure_info From f2902ec532bb34451f15c1250ceff169e4cf0bce Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 19 Aug 2024 10:23:55 -0400 Subject: [PATCH 7/9] Relocate test to sandbox-friendly file --- tests/worker/test_activity.py | 45 ----------------------------------- tests/worker/test_workflow.py | 44 ++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 3791d8c40..c981a98c8 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -41,7 +41,6 @@ Worker, WorkerConfig, ) -from tests.helpers import new_worker from tests.helpers.worker import ( ExternalWorker, KSAction, @@ -1352,47 +1351,3 @@ def assert_activity_application_error( ret = assert_activity_error(err) assert isinstance(ret, ApplicationError) return ret - - -@activity.defn -async def activity_with_retry_delay(): - raise ApplicationError( - ActivitiesWithRetryDelayWorkflow.error_message, - next_retry_delay=ActivitiesWithRetryDelayWorkflow.next_retry_delay, - ) - - -@workflow.defn -class ActivitiesWithRetryDelayWorkflow: - error_message = "Deliberately failing with next_retry_delay set" - next_retry_delay = timedelta(milliseconds=5) - - @workflow.run - async def run(self) -> None: - await workflow.execute_activity( - activity_with_retry_delay, - retry_policy=RetryPolicy(maximum_attempts=2), - schedule_to_close_timeout=self.next_retry_delay, - ) - - -async def test_activity_retry_delay(client: Client): - async with new_worker( - client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay] - ) as worker: - try: - await client.execute_workflow( - ActivitiesWithRetryDelayWorkflow.run, - id=str(uuid.uuid4()), - task_queue=worker.task_queue, - ) - except WorkflowFailureError as err: - assert isinstance(err.cause, ActivityError) - assert isinstance(err.cause.cause, ApplicationError) - assert ( - str(err.cause.cause) == ActivitiesWithRetryDelayWorkflow.error_message - ) - assert ( - err.cause.cause.next_retry_delay - == ActivitiesWithRetryDelayWorkflow.next_retry_delay - ) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 353579e66..9e0575d7e 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5861,3 +5861,47 @@ async def test_timer_started_after_workflow_completion(client: Client): ) await handle.signal(TimerStartedAfterWorkflowCompletionWorkflow.my_signal) assert await handle.result() == "workflow-result" + + +@activity.defn +async def activity_with_retry_delay(): + raise ApplicationError( + ActivitiesWithRetryDelayWorkflow.error_message, + next_retry_delay=ActivitiesWithRetryDelayWorkflow.next_retry_delay, + ) + + +@workflow.defn +class ActivitiesWithRetryDelayWorkflow: + error_message = "Deliberately failing with next_retry_delay set" + next_retry_delay = timedelta(milliseconds=5) + + @workflow.run + async def run(self) -> None: + await workflow.execute_activity( + activity_with_retry_delay, + retry_policy=RetryPolicy(maximum_attempts=2), + schedule_to_close_timeout=self.next_retry_delay, + ) + + +async def test_activity_retry_delay(client: Client): + async with new_worker( + client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay] + ) as worker: + try: + await client.execute_workflow( + ActivitiesWithRetryDelayWorkflow.run, + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + except WorkflowFailureError as err: + assert isinstance(err.cause, ActivityError) + assert isinstance(err.cause.cause, ApplicationError) + assert ( + str(err.cause.cause) == ActivitiesWithRetryDelayWorkflow.error_message + ) + assert ( + err.cause.cause.next_retry_delay + == ActivitiesWithRetryDelayWorkflow.next_retry_delay + ) From 4b6c7c2e891357242bcc76bd7e1d34a07b8d0b9b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 19 Aug 2024 11:42:41 -0400 Subject: [PATCH 8/9] Unflake test --- tests/worker/test_activity.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index c981a98c8..b17a0650f 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -744,7 +744,14 @@ async def test_sync_activity_process_non_picklable_heartbeat_details( picklable_activity_non_pickable_heartbeat_details, worker_config={"activity_executor": executor}, ) - assert "Can't pickle" in str(assert_activity_application_error(err.value)) + msg = str(assert_activity_application_error(err.value)) + # TODO: different messages can apparently be produced across runs/platforms + # See e.g. https://github.com/temporalio/sdk-python/actions/runs/10455232879/job/28949714969?pr=571 + assert ( + "Can't pickle" in msg + or "Can't get local object 'picklable_activity_non_pickable_heartbeat_details..'" + in msg + ) async def test_activity_error_non_retryable(client: Client, worker: ExternalWorker): From 355dfc6926ee8336df1424b939835479109e6d84 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 19 Aug 2024 12:25:52 -0400 Subject: [PATCH 9/9] Fix activity timeout copy paste error (Surprising 5ms passed on anything?) --- tests/worker/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 9e0575d7e..907d30ceb 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5881,7 +5881,7 @@ async def run(self) -> None: await workflow.execute_activity( activity_with_retry_delay, retry_policy=RetryPolicy(maximum_attempts=2), - schedule_to_close_timeout=self.next_retry_delay, + schedule_to_close_timeout=timedelta(minutes=5), )