From f6cd0eb3c12fa3b30b1af357d1aba5b4d28fbee6 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 13 Jul 2022 07:27:15 -0500 Subject: [PATCH 1/2] Properly handle uncaught child/activity cancel during workflow cancel Fixes #70 --- temporalio/worker/workflow_instance.py | 16 ++++++++ tests/worker/test_workflow.py | 52 ++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/temporalio/worker/workflow_instance.py b/temporalio/worker/workflow_instance.py index 455b764ab..0b69c4f0f 100644 --- a/temporalio/worker/workflow_instance.py +++ b/temporalio/worker/workflow_instance.py @@ -149,6 +149,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: self._info = det.info self._primary_task: Optional[asyncio.Task[None]] = None self._time = 0.0 + self._cancel_requested = False # Handles which are ready to run on the next event loop iteration self._ready: Deque[asyncio.Handle] = collections.deque() self._conditions: List[Tuple[Callable[[], bool], asyncio.Future]] = [] @@ -343,6 +344,7 @@ def _apply( def _apply_cancel_workflow( self, job: temporalio.bridge.proto.workflow_activation.CancelWorkflow ) -> None: + self._cancel_requested = True # TODO(cretz): Details or cancel message or whatever? if self._primary_task: self._primary_task.cancel() @@ -1121,6 +1123,20 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: f"Workflow raised failure with run ID {self._info.run_id}", exc_info=True, ) + # If a cancel was requested, and the failure is from an activity or + # child, and its cause was a cancellation, we want to use that cause + # instead because it means a cancel bubbled up while waiting on an + # activity or child. + if ( + self._cancel_requested + and ( + isinstance(err, temporalio.exceptions.ActivityError) + or isinstance(err, temporalio.exceptions.ChildWorkflowError) + ) + and isinstance(err.cause, temporalio.exceptions.CancelledError) + ): + err = err.cause + command = self._add_command() command.fail_workflow_execution.failure.SetInParent() try: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index e8ae772a4..435ad0d3f 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -676,6 +676,58 @@ async def started() -> bool: assert isinstance(err.value.cause, CancelledError) +@activity.defn +async def wait_forever() -> NoReturn: + await asyncio.Future() + raise RuntimeError("Unreachable") + + +@workflow.defn +class UncaughtCancelWorkflow: + @workflow.run + async def run(self, activity: bool) -> NoReturn: + self._started = True + # Wait forever on activity or child workflow + if activity: + await workflow.execute_activity( + wait_forever, start_to_close_timeout=timedelta(seconds=1000) + ) + else: + await workflow.execute_child_workflow( + UncaughtCancelWorkflow.run, + True, + id=f"{workflow.info().workflow_id}_child", + ) + + @workflow.query + def started(self) -> bool: + return self._started + + +@pytest.mark.parametrize("activity", [True, False]) +async def test_workflow_uncaught_cancel(client: Client, activity: bool): + async with new_worker( + client, UncaughtCancelWorkflow, activities=[wait_forever] + ) as worker: + # Start workflow waiting on activity or child workflow, cancel it, and + # confirm the workflow is shown as cancelled + handle = await client.start_workflow( + UncaughtCancelWorkflow.run, + activity, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + async def started() -> bool: + return await handle.query(UncaughtCancelWorkflow.started) + + await assert_eq_eventually(True, started) + await handle.cancel() + with pytest.raises(WorkflowFailureError) as err: + await handle.result() + assert isinstance(err.value.cause, CancelledError) + + @workflow.defn class CancelChildWorkflow: def __init__(self) -> None: From 5f374425295462c681a03d56813079de9512ed33 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 13 Jul 2022 09:13:28 -0500 Subject: [PATCH 2/2] Test fix --- tests/worker/test_workflow.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 435ad0d3f..7b414e0e0 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -45,7 +45,13 @@ ) from temporalio.bridge.proto.workflow_activation import WorkflowActivation from temporalio.bridge.proto.workflow_completion import WorkflowActivationCompletion -from temporalio.client import Client, WorkflowFailureError, WorkflowHandle +from temporalio.client import ( + Client, + RPCError, + RPCStatusCode, + WorkflowFailureError, + WorkflowHandle, +) from temporalio.common import RetryPolicy, SearchAttributes from temporalio.converter import DataConverter, PayloadCodec, decode_search_attributes from temporalio.exceptions import ( @@ -770,13 +776,19 @@ async def test_workflow_cancel_child_started(client: Client, use_execute: bool): ) # Wait until child started async def child_started() -> bool: - return await handle.query( - CancelChildWorkflow.ready - ) and await client.get_workflow_handle_for( - LongSleepWorkflow.run, workflow_id=f"{handle.id}_child" - ).query( - LongSleepWorkflow.started - ) + try: + return await handle.query( + CancelChildWorkflow.ready + ) and await client.get_workflow_handle_for( + LongSleepWorkflow.run, workflow_id=f"{handle.id}_child" + ).query( + LongSleepWorkflow.started + ) + except RPCError as err: + # Ignore not-found because child may not have started yet + if err.status == RPCStatusCode.NOT_FOUND: + return False + raise await assert_eq_eventually(True, child_started) # Send cancel signal and wait on the handle