From 77382c08074b0023833514b32735de6177392da1 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 1 Oct 2024 06:25:03 -0400 Subject: [PATCH 01/12] Add documentation of signal and update handlers --- README.md | 68 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 34eed9c3d..2206e108a 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ informal introduction to the features and their implementation. - [Asyncio Cancellation](#asyncio-cancellation) - [Workflow Utilities](#workflow-utilities) - [Exceptions](#exceptions) + - [Signal and update handlers](#signal-and-update-handlers) - [External Workflows](#external-workflows) - [Testing](#testing) - [Automatic Time Skipping](#automatic-time-skipping) @@ -581,28 +582,35 @@ Here are the decorators that can be applied: * The purpose of this decorator is to allow operations involving workflow arguments to be performed in the `__init__` method, before any signal or update handler has a chance to execute. * `@workflow.signal` - Defines a method as a signal - * Can be defined on an `async` or non-`async` function at any hierarchy depth, but if decorated method is overridden, - the override must also be decorated - * The method's arguments are the signal's arguments - * Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name + * Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method + is overridden, then the override must also be decorated. + * The method's arguments are the signal's arguments. + * Return value is ignored. + * May mutate workflow state, and make calls to other workflow APIs like starting activities, etc. + * Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name. * Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have `name` argument, and method parameters must be `self`, a string signal name, and a `Sequence[temporalio.common.RawValue]`. * Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an object/dataclass of fields that can be added to as needed. - * Return value is ignored -* `@workflow.query` - Defines a method as a query - * All the same constraints as `@workflow.signal` but should return a value - * Should not be `async` - * Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow + * See [Signal and update handlers](#signal-and-update-handlers) below * `@workflow.update` - Defines a method as an update - * May both accept as input and return a value + * Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method + is overridden, then the override must also be decorated. + * May accept input and return a value + * The method's arguments are the update's arguments. * May be `async` or non-`async` * May mutate workflow state, and make calls to other workflow APIs like starting activities, etc. - * Also accepts the `name` and `dynamic` parameters like signals and queries, with the same semantics. + * Also accepts the `name` and `dynamic` parameters like signal, with the same semantics. * Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`. To reject an update before any events are written to history, throw an exception in a validator. Validators cannot be `async`, cannot mutate workflow state, and return nothing. + * See [Signal and update handlers](#signal-and-update-handlers) below +* `@workflow.query` - Defines a method as a query + * Should return a value + * Should not be `async` + * Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow + * Also accepts the `name` and `dynamic` parameters like signal and update, with the same semantics. #### Running @@ -705,9 +713,15 @@ deterministic: #### Asyncio Cancellation -Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not -necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to -protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation: +Cancellation is done using `asyncio` [task cancellation](https://docs.python.org/3/library/asyncio-task.html#task-cancellation). +This means that tasks are requested to be cancelled but can catch the +[`asyncio.CancelledError`](https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError), thus +allowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or to +deny the cancellation entirely. It also means that +[`asyncio.shield()`](https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation) can be used to +protect tasks against cancellation. + +The following tasks, when cancelled, perform a Temporal cancellation: * Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity * Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent to @@ -746,17 +760,39 @@ While running in a workflow, in addition to features documented elsewhere, the f be marked non-retryable or include details as needed. * Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of `FailureError` and will fail the workflow when uncaught. +* Update handlers are special: an instance of `temporalio.exceptions.FailureError` raised in an update handler will fail + the update instead of failing the workflow. * All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow is fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an `ApplicationError` as mentioned above. This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a `Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance -of any type in either list, it will fail the workflow instead of the task. This means a value of `[Exception]` will -cause every exception to fail the workflow instead of the task. Also, as a special case, if +of any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of +`[Exception]` will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, if `temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the workflow. WARNING: These settings are experimental. +#### Signal and update handlers + +Signal and update handlers are defined using decorated methods as shown in the example [above](#definition). Client code +sends signals and updates using `workflow_handle.signal`, `workflow_handle.execute_update`, or +`workflow_handle.start_update`. When the workflow receives one of these requests, it starts an `asyncio.Task` executing +the corresponding handler method with the argument(s) from the request. + +The handler methods may be `async def` and can do all the async operations described above (e.g. invoking activities and +child workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executing +concurrently with respect to each other and the main workflow task. Use +[asyncio.Lock](https://docs.python.org/3/library/asyncio-sync.html#lock) and +[asyncio.Semaphore](https://docs.python.org/3/library/asyncio-sync.html#semaphore) if necessary. + +Your main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. You +should ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, you +will see a warning (the warning can be disabled via the `workflow.signal`/`workflow.update` decorators). One way to +ensure that handler tasks have finished is to wait on the `workflow.all_handlers_finished` condition: +```python +await workflow.wait_condition(workflow.all_handlers_finished) +``` #### External Workflows * `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow From 722d9edb62b2ae28fe1893e127dd34b37fa4022a Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 25 Sep 2024 06:58:36 -0400 Subject: [PATCH 02/12] SDK-2453 Failing test of update and workflow cancellation --- tests/worker/test_workflow.py | 42 +++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 15afe8c46..a3e31e66a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -6146,3 +6146,45 @@ async def test_workflow_run_sees_workflow_init(client: Client): task_queue=worker.task_queue, ) assert workflow_result == "hello, world" + + +@workflow.defn +class UpdateCancellationWorkflow: + def __init__(self) -> None: + self.non_terminating_operation_has_started = False + + @workflow.run + async def run(self) -> NoReturn: + await asyncio.Future() + + @workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) + async def non_terminating_update(self) -> NoReturn: + self.non_terminating_operation_has_started = True + await asyncio.Future() + + @workflow.update + async def wait_until_non_terminating_operation_has_started(self) -> None: + await workflow.wait_condition( + lambda: self.non_terminating_operation_has_started + ) + + +async def test_update_cancellation(client: Client): + async with new_worker(client, UpdateCancellationWorkflow) as worker: + wf_handle = await client.start_workflow( + UpdateCancellationWorkflow.run, + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + # Asynchronously run an update that will never complete + non_terminating_update = asyncio.create_task( + wf_handle.execute_update(UpdateCancellationWorkflow.non_terminating_update) + ) + # Wait until we know the update handler has started executing + await wf_handle.execute_update( + UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started + ) + # Cancel the workflow and confirm that update caller sees update failed + await wf_handle.cancel() + with pytest.raises(WorkflowUpdateFailedError): + await non_terminating_update From b5b729202d01970f81631ffb73cab1a8001cd591 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 05:15:29 -0400 Subject: [PATCH 03/12] Refactor and add asyncio.Task to HandlerExecution dataclass --- temporalio/worker/_workflow_instance.py | 33 +++++++++++++++---------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 1ca70a230..a7b0586ba 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -518,9 +518,9 @@ async def run_update() -> None: f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. " f"known updates: [{' '.join(known_updates)}]" ) - self._in_progress_updates[job.id] = HandlerExecution( - job.name, defn.unfinished_policy, job.id - ) + self._in_progress_updates[ + job.id + ].unfinished_policy = defn.unfinished_policy args = self._process_handler_args( job.name, job.input, @@ -571,7 +571,7 @@ async def run_update() -> None: # All asyncio cancelled errors become Temporal cancelled errors if isinstance(err, asyncio.CancelledError): err = temporalio.exceptions.CancelledError( - f"Cancellation raised within update {err}" + f"Cancellation raised within update: {err}" ) # Read-only issues during validation should fail the task if isinstance(err, temporalio.workflow.ReadOnlyContextError): @@ -606,10 +606,16 @@ async def run_update() -> None: finally: self._in_progress_updates.pop(job.id, None) - self.create_task( + task = self.create_task( run_update(), name=f"update: {job.name}", ) + self._in_progress_updates[job.id] = HandlerExecution( + job.name, + task, + temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON, + job.id, + ) def _apply_fire_timer( self, job: temporalio.bridge.proto.workflow_activation.FireTimer @@ -1729,20 +1735,20 @@ def _process_signal_job( signal=job.signal_name, args=args, headers=job.headers ) + task = self.create_task( + self._run_top_level_workflow_function(self._inbound.handle_signal(input)), + name=f"signal: {job.signal_name}", + ) self._handled_signals_seq += 1 id = self._handled_signals_seq - self._in_progress_signals[id] = HandlerExecution( - job.signal_name, defn.unfinished_policy - ) - def done_callback(f): + def done_callback(_): self._in_progress_signals.pop(id, None) - task = self.create_task( - self._run_top_level_workflow_function(self._inbound.handle_signal(input)), - name=f"signal: {job.signal_name}", - ) task.add_done_callback(done_callback) + self._in_progress_signals[id] = HandlerExecution( + job.signal_name, task, defn.unfinished_policy + ) def _register_task( self, @@ -2811,6 +2817,7 @@ class HandlerExecution: """Information about an execution of a signal or update handler.""" name: str + task: asyncio.Task[None] unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy id: Optional[str] = None From 2e3883363d8b46fea813ba22e4bc8a1d2c838d81 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 05:15:55 -0400 Subject: [PATCH 04/12] Cancel update tasks on workflow cancellation --- temporalio/worker/_workflow_instance.py | 7 +++++++ tests/worker/test_workflow.py | 11 ++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index a7b0586ba..bb2c05237 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1851,6 +1851,13 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: err ): self._add_command().cancel_workflow_execution.SetInParent() + # Cancel update tasks, so that the update caller receives an + # update failed error. We do not currently cancel signal tasks + # since (a) doing so would require a workflow flag and (b) the + # presence of the update caller gives a strong reason to cancel + # update tasks. + for update_handler in self._in_progress_updates.values(): + update_handler.task.cancel("The workflow was cancelled.") elif self._is_workflow_failure_exception(err): # All other failure errors fail the workflow self._set_workflow_failure(err) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index a3e31e66a..052a28bf1 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5646,7 +5646,16 @@ async def _run_workflow_and_get_warning(self) -> bool: with pytest.WarningsRecorder() as warnings: if self.handler_type == "-update-": assert update_task - if self.handler_waiting == "-wait-all-handlers-finish-": + + if self.workflow_termination_type == "-cancellation-": + with pytest.raises(WorkflowUpdateFailedError) as update_err: + await update_task + assert isinstance(update_err.value.cause, CancelledError) + assert ( + "the workflow was cancelled" + in str(update_err.value.cause).lower() + ) + elif self.handler_waiting == "-wait-all-handlers-finish-": await update_task else: with pytest.raises(RPCError) as update_err: From c45d9ce471f92cab5f280ac962c96ec4c085291b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 07:54:05 -0400 Subject: [PATCH 05/12] Emit unfinished handler warnings separately for cancellation --- temporalio/worker/_workflow_instance.py | 12 +++++++++--- tests/worker/test_workflow.py | 13 ++++++++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index bb2c05237..bc64fe923 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -429,15 +429,20 @@ def activate( f"Failed converting activation exception: {inner_err}" ) - def is_completion(command): + def is_non_cancellation_completion(command): return ( command.HasField("complete_workflow_execution") or command.HasField("continue_as_new_workflow_execution") or command.HasField("fail_workflow_execution") - or command.HasField("cancel_workflow_execution") ) - if any(map(is_completion, self._current_completion.successful.commands)): + # We do also warn in the case of workflow cancellation, but this is done + # when handling the workflow cancellation, since we also cancel update + # handlers at that time. + if any( + is_non_cancellation_completion(c) + for c in self._current_completion.successful.commands + ): self._warn_if_unfinished_handlers() return self._current_completion @@ -1851,6 +1856,7 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: err ): self._add_command().cancel_workflow_execution.SetInParent() + self._warn_if_unfinished_handlers() # Cancel update tasks, so that the update caller receives an # update failed error. We do not currently cancel signal tasks # since (a) doing so would require a workflow flag and (b) the diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 052a28bf1..a9e7af992 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5584,9 +5584,16 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - assert await self._run_workflow_and_get_warning() == ( - self.handler_waiting == "-no-wait-all-handlers-finish-" - ) + warning_emitted = await self._run_workflow_and_get_warning() + if self.workflow_termination_type == "-cancellation-": + # All paths through this test for which the workflow is cancelled result + # in the warning being emitted. + assert warning_emitted + else: + # Otherwise, the warning is emitted iff the workflow does not wait for handlers to finish. + assert warning_emitted == ( + self.handler_waiting == "-no-wait-all-handlers-finish-" + ) async def _run_workflow_and_get_warning(self) -> bool: workflow_id = f"wf-{uuid.uuid4()}" From a6d41a5bbf65582996cbba2f66d34eeecb1627de Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 10:47:31 -0400 Subject: [PATCH 06/12] Strengthen test assertion (count instead of presence) --- tests/worker/test_workflow.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index a9e7af992..fce172f7b 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5584,18 +5584,18 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - warning_emitted = await self._run_workflow_and_get_warning() + warnings_emitted = await self._run_workflow_and_get_warning() if self.workflow_termination_type == "-cancellation-": # All paths through this test for which the workflow is cancelled result # in the warning being emitted. - assert warning_emitted + assert warnings_emitted == 1 else: # Otherwise, the warning is emitted iff the workflow does not wait for handlers to finish. - assert warning_emitted == ( - self.handler_waiting == "-no-wait-all-handlers-finish-" + assert warnings_emitted == ( + 1 if self.handler_waiting == "-no-wait-all-handlers-finish-" else 0 ) - async def _run_workflow_and_get_warning(self) -> bool: + async def _run_workflow_and_get_warning(self) -> int: workflow_id = f"wf-{uuid.uuid4()}" update_id = "update-id" task_queue = "tq" @@ -5688,9 +5688,10 @@ async def _run_workflow_and_get_warning(self) -> bool: == "Deliberately failing post-ContinueAsNew run" ) - unfinished_handler_warning_emitted = any( - issubclass(w.category, self._unfinished_handler_warning_cls) + unfinished_handler_warning_emitted = sum( + 1 for w in warnings + if issubclass(w.category, self._unfinished_handler_warning_cls) ) return unfinished_handler_warning_emitted From da6566e530f59d70d7b94bd35ae757b463033211 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 15:27:24 -0400 Subject: [PATCH 07/12] Revert "Strengthen test assertion (count instead of presence)" This reverts commit a9600e37fa16f0b0a296b6d85ab05935fb9b3d59. --- tests/worker/test_workflow.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index fce172f7b..a9e7af992 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5584,18 +5584,18 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - warnings_emitted = await self._run_workflow_and_get_warning() + warning_emitted = await self._run_workflow_and_get_warning() if self.workflow_termination_type == "-cancellation-": # All paths through this test for which the workflow is cancelled result # in the warning being emitted. - assert warnings_emitted == 1 + assert warning_emitted else: # Otherwise, the warning is emitted iff the workflow does not wait for handlers to finish. - assert warnings_emitted == ( - 1 if self.handler_waiting == "-no-wait-all-handlers-finish-" else 0 + assert warning_emitted == ( + self.handler_waiting == "-no-wait-all-handlers-finish-" ) - async def _run_workflow_and_get_warning(self) -> int: + async def _run_workflow_and_get_warning(self) -> bool: workflow_id = f"wf-{uuid.uuid4()}" update_id = "update-id" task_queue = "tq" @@ -5688,10 +5688,9 @@ async def _run_workflow_and_get_warning(self) -> int: == "Deliberately failing post-ContinueAsNew run" ) - unfinished_handler_warning_emitted = sum( - 1 + unfinished_handler_warning_emitted = any( + issubclass(w.category, self._unfinished_handler_warning_cls) for w in warnings - if issubclass(w.category, self._unfinished_handler_warning_cls) ) return unfinished_handler_warning_emitted From d8a05abcac2676aebab3cfaf98032fff74c3652c Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 15:27:26 -0400 Subject: [PATCH 08/12] Revert "Emit unfinished handler warnings separately for cancellation" This reverts commit 87f540e0bc423ec9cab030320efb157d73d270c0. --- temporalio/worker/_workflow_instance.py | 12 +++--------- tests/worker/test_workflow.py | 13 +++---------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index bc64fe923..bb2c05237 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -429,20 +429,15 @@ def activate( f"Failed converting activation exception: {inner_err}" ) - def is_non_cancellation_completion(command): + def is_completion(command): return ( command.HasField("complete_workflow_execution") or command.HasField("continue_as_new_workflow_execution") or command.HasField("fail_workflow_execution") + or command.HasField("cancel_workflow_execution") ) - # We do also warn in the case of workflow cancellation, but this is done - # when handling the workflow cancellation, since we also cancel update - # handlers at that time. - if any( - is_non_cancellation_completion(c) - for c in self._current_completion.successful.commands - ): + if any(map(is_completion, self._current_completion.successful.commands)): self._warn_if_unfinished_handlers() return self._current_completion @@ -1856,7 +1851,6 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: err ): self._add_command().cancel_workflow_execution.SetInParent() - self._warn_if_unfinished_handlers() # Cancel update tasks, so that the update caller receives an # update failed error. We do not currently cancel signal tasks # since (a) doing so would require a workflow flag and (b) the diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index a9e7af992..052a28bf1 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5584,16 +5584,9 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - warning_emitted = await self._run_workflow_and_get_warning() - if self.workflow_termination_type == "-cancellation-": - # All paths through this test for which the workflow is cancelled result - # in the warning being emitted. - assert warning_emitted - else: - # Otherwise, the warning is emitted iff the workflow does not wait for handlers to finish. - assert warning_emitted == ( - self.handler_waiting == "-no-wait-all-handlers-finish-" - ) + assert await self._run_workflow_and_get_warning() == ( + self.handler_waiting == "-no-wait-all-handlers-finish-" + ) async def _run_workflow_and_get_warning(self) -> bool: workflow_id = f"wf-{uuid.uuid4()}" From 43ad2c89149c865b03def9033560e85bb51bc749 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 17:53:20 -0400 Subject: [PATCH 09/12] Don't emit unfinished warnings on workflow cancellation --- temporalio/worker/_workflow_instance.py | 1 - tests/worker/test_workflow.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index bb2c05237..a3a551acd 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -434,7 +434,6 @@ def is_completion(command): command.HasField("complete_workflow_execution") or command.HasField("continue_as_new_workflow_execution") or command.HasField("fail_workflow_execution") - or command.HasField("cancel_workflow_execution") ) if any(map(is_completion, self._current_completion.successful.commands)): diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 052a28bf1..74efe7bc0 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5584,8 +5584,10 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - assert await self._run_workflow_and_get_warning() == ( + warning_emitted = await self._run_workflow_and_get_warning() + assert warning_emitted == ( self.handler_waiting == "-no-wait-all-handlers-finish-" + and self.workflow_termination_type != "-cancellation-" ) async def _run_workflow_and_get_warning(self) -> bool: From b4858024784011f8f60d3a7c650c6cbcfc101b56 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 27 Sep 2024 18:25:28 -0400 Subject: [PATCH 10/12] Fix mypy errors --- tests/worker/test_workflow.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 74efe7bc0..8ed4ed464 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5660,10 +5660,10 @@ async def _run_workflow_and_get_warning(self) -> bool: elif self.handler_waiting == "-wait-all-handlers-finish-": await update_task else: - with pytest.raises(RPCError) as update_err: + with pytest.raises(RPCError) as rpc_err: await update_task - assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( - str(update_err.value).lower() + assert rpc_err.value.status == RPCStatusCode.NOT_FOUND and ( + str(rpc_err.value).lower() == "workflow execution already completed" ) @@ -6165,11 +6165,11 @@ def __init__(self) -> None: self.non_terminating_operation_has_started = False @workflow.run - async def run(self) -> NoReturn: + async def run(self) -> None: await asyncio.Future() @workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) - async def non_terminating_update(self) -> NoReturn: + async def non_terminating_update(self) -> None: self.non_terminating_operation_has_started = True await asyncio.Future() @@ -6198,4 +6198,4 @@ async def test_update_cancellation(client: Client): # Cancel the workflow and confirm that update caller sees update failed await wf_handle.cancel() with pytest.raises(WorkflowUpdateFailedError): - await non_terminating_update + await non_terminating_update # type: ignore From a8ba197ce316cbd21308153cf4b588152e4f3dc3 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 30 Sep 2024 06:24:09 -0400 Subject: [PATCH 11/12] 3.8 compatibility --- temporalio/worker/_workflow_instance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index a3a551acd..23964c606 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1856,7 +1856,7 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: # presence of the update caller gives a strong reason to cancel # update tasks. for update_handler in self._in_progress_updates.values(): - update_handler.task.cancel("The workflow was cancelled.") + update_handler.task.cancel() elif self._is_workflow_failure_exception(err): # All other failure errors fail the workflow self._set_workflow_failure(err) From 423d2d5387d1c52867dff5b0f75e84b277b423f5 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 1 Oct 2024 06:25:38 -0400 Subject: [PATCH 12/12] Document cancellation of update handlers --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 2206e108a..6eb13e501 100644 --- a/README.md +++ b/README.md @@ -793,6 +793,10 @@ ensure that handler tasks have finished is to wait on the `workflow.all_handlers ```python await workflow.wait_condition(workflow.all_handlers_finished) ``` + +If your main workflow task finishes as a result of cancellation, then any in-progress update handler tasks will be +automatically requested to cancel. + #### External Workflows * `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow