-
Notifications
You must be signed in to change notification settings - Fork 131
Make update caller receive update failed error on workflow cancellation #653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
77382c0
722d9ed
b5b7292
2e38833
c45d9ce
a6d41a5
da6566e
d8a05ab
43ad2c8
b485802
a8ba197
423d2d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -434,7 +434,6 @@ def is_completion(command): | |
command.HasField("complete_workflow_execution") | ||
or command.HasField("continue_as_new_workflow_execution") | ||
or command.HasField("fail_workflow_execution") | ||
or command.HasField("cancel_workflow_execution") | ||
) | ||
|
||
if any(map(is_completion, self._current_completion.successful.commands)): | ||
|
@@ -518,9 +517,9 @@ async def run_update() -> None: | |
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. " | ||
f"known updates: [{' '.join(known_updates)}]" | ||
) | ||
self._in_progress_updates[job.id] = HandlerExecution( | ||
job.name, defn.unfinished_policy, job.id | ||
) | ||
self._in_progress_updates[ | ||
job.id | ||
].unfinished_policy = defn.unfinished_policy | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are now forced to create the |
||
args = self._process_handler_args( | ||
job.name, | ||
job.input, | ||
|
@@ -571,7 +570,7 @@ async def run_update() -> None: | |
# All asyncio cancelled errors become Temporal cancelled errors | ||
if isinstance(err, asyncio.CancelledError): | ||
err = temporalio.exceptions.CancelledError( | ||
f"Cancellation raised within update {err}" | ||
f"Cancellation raised within update: {err}" | ||
) | ||
# Read-only issues during validation should fail the task | ||
if isinstance(err, temporalio.workflow.ReadOnlyContextError): | ||
|
@@ -606,10 +605,16 @@ async def run_update() -> None: | |
finally: | ||
self._in_progress_updates.pop(job.id, None) | ||
|
||
self.create_task( | ||
task = self.create_task( | ||
run_update(), | ||
name=f"update: {job.name}", | ||
) | ||
self._in_progress_updates[job.id] = HandlerExecution( | ||
job.name, | ||
task, | ||
temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON, | ||
job.id, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are now storing |
||
|
||
def _apply_fire_timer( | ||
self, job: temporalio.bridge.proto.workflow_activation.FireTimer | ||
|
@@ -1729,20 +1734,20 @@ def _process_signal_job( | |
signal=job.signal_name, args=args, headers=job.headers | ||
) | ||
|
||
task = self.create_task( | ||
self._run_top_level_workflow_function(self._inbound.handle_signal(input)), | ||
name=f"signal: {job.signal_name}", | ||
) | ||
self._handled_signals_seq += 1 | ||
id = self._handled_signals_seq | ||
self._in_progress_signals[id] = HandlerExecution( | ||
job.signal_name, defn.unfinished_policy | ||
) | ||
|
||
def done_callback(f): | ||
def done_callback(_): | ||
self._in_progress_signals.pop(id, None) | ||
|
||
task = self.create_task( | ||
self._run_top_level_workflow_function(self._inbound.handle_signal(input)), | ||
name=f"signal: {job.signal_name}", | ||
) | ||
task.add_done_callback(done_callback) | ||
self._in_progress_signals[id] = HandlerExecution( | ||
job.signal_name, task, defn.unfinished_policy | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't actually do anything with the signal tasks currently, but we add the |
||
|
||
def _register_task( | ||
self, | ||
|
@@ -1845,6 +1850,13 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: | |
err | ||
): | ||
self._add_command().cancel_workflow_execution.SetInParent() | ||
# Cancel update tasks, so that the update caller receives an | ||
# update failed error. We do not currently cancel signal tasks | ||
# since (a) doing so would require a workflow flag and (b) the | ||
# presence of the update caller gives a strong reason to cancel | ||
# update tasks. | ||
for update_handler in self._in_progress_updates.values(): | ||
update_handler.task.cancel() | ||
elif self._is_workflow_failure_exception(err): | ||
# All other failure errors fail the workflow | ||
self._set_workflow_failure(err) | ||
|
@@ -2811,6 +2823,7 @@ class HandlerExecution: | |
"""Information about an execution of a signal or update handler.""" | ||
|
||
name: str | ||
task: asyncio.Task[None] | ||
unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy | ||
id: Optional[str] = None | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5584,8 +5584,10 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: | |
async def test_warning_is_issued_on_exit_with_unfinished_handler( | ||
self, | ||
): | ||
assert await self._run_workflow_and_get_warning() == ( | ||
warning_emitted = await self._run_workflow_and_get_warning() | ||
assert warning_emitted == ( | ||
self.handler_waiting == "-no-wait-all-handlers-finish-" | ||
and self.workflow_termination_type != "-cancellation-" | ||
) | ||
|
||
async def _run_workflow_and_get_warning(self) -> bool: | ||
|
@@ -5646,13 +5648,22 @@ async def _run_workflow_and_get_warning(self) -> bool: | |
with pytest.WarningsRecorder() as warnings: | ||
if self.handler_type == "-update-": | ||
assert update_task | ||
if self.handler_waiting == "-wait-all-handlers-finish-": | ||
|
||
if self.workflow_termination_type == "-cancellation-": | ||
with pytest.raises(WorkflowUpdateFailedError) as update_err: | ||
await update_task | ||
assert isinstance(update_err.value.cause, CancelledError) | ||
assert ( | ||
"the workflow was cancelled" | ||
in str(update_err.value.cause).lower() | ||
) | ||
elif self.handler_waiting == "-wait-all-handlers-finish-": | ||
await update_task | ||
else: | ||
with pytest.raises(RPCError) as update_err: | ||
with pytest.raises(RPCError) as rpc_err: | ||
await update_task | ||
assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( | ||
str(update_err.value).lower() | ||
assert rpc_err.value.status == RPCStatusCode.NOT_FOUND and ( | ||
str(rpc_err.value).lower() | ||
== "workflow execution already completed" | ||
) | ||
|
||
|
@@ -6146,3 +6157,45 @@ async def test_workflow_run_sees_workflow_init(client: Client): | |
task_queue=worker.task_queue, | ||
) | ||
assert workflow_result == "hello, world" | ||
|
||
|
||
@workflow.defn | ||
class UpdateCancellationWorkflow: | ||
def __init__(self) -> None: | ||
self.non_terminating_operation_has_started = False | ||
|
||
@workflow.run | ||
async def run(self) -> None: | ||
await asyncio.Future() | ||
|
||
@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) | ||
async def non_terminating_update(self) -> None: | ||
self.non_terminating_operation_has_started = True | ||
await asyncio.Future() | ||
|
||
@workflow.update | ||
async def wait_until_non_terminating_operation_has_started(self) -> None: | ||
await workflow.wait_condition( | ||
lambda: self.non_terminating_operation_has_started | ||
) | ||
|
||
|
||
async def test_update_cancellation(client: Client): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be worth having a test for doing a cleanup step on cancellation (e.g. in a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, good point. So to restate what you're saying:
That is true today: we don't cancel signals or updates on workflow cancellation, but a user could write a workflow that does, and they would escape the warning, because the handler would not be alive at check time. But my PR breaks it, by attempting to make the check at workflow cancellation time: it would classify a handler as unfinished when in fact it's about to be gracefully terminated. The reason my PR does that is that it wanted to continue to warn the workflow author when workflow cancellation causes an update cancellation. And the reason it wants to do that is to maintain consistency with signals: when the workflow is canceled, if we're going to warn on unfinished signal handlers (which we do) then we should warn on unfinished update handlers also. I think that these desires are mutually incompatible. We have to allow a user to clean up their update gracefully without the warning, as you point out. I see two solutions:
There is a strong incentive (caller UX) for automatically canceling update handlers on workflow cancellation. We have never automatically cancelled signal handlers in the Python SDK, and doing so would require a flag as it may be backwards incompatible with replay of existing histories. The inconsistency in cancelling update handlers but not signal handlers would only be a problem if there is a potential for observable side effects. I'll carry on thinking about it but for now I've updated the PR to stop issuing the unfinished warning on workflow cancellation, for both signal and update. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we should warn on unfinished handlers upon workflow completion regardless of reason for completion and regardless of reason handlers are unfinished. I think the problem with the PR before was it was warning before cancellation is processed and I think the previous behavior of warning after cancellation/completion is processed is best. So I think the line at #653 (comment) should not be removed. It does mean it's a pain for users doing "cleanup" in update handlers, but IMO that's an education issue. We have to educate them that they should not exit the primary run function until they have done cleanup in handlers. This is similar to process exiting before an async thing has been given a chance to clean up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're suggesting that we continue to do the check late, and warn on workflow cancellation. We can't do that, because in the common case we will then warn on unfinished signals (since these are not automatically cancelled, and hence the handlers will still be alive) but not warn on unfinished updates (since these are automatically cancelled, and hence the handlers will be dead). (I did try to explain that in the comment above #653 (comment), but it is turning out to be a slightly tricky subject! And my comment isn't very clear). This is why I've switched the PR to no longer warn on cancellation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To confirm from our off-PR discussion, we are still warning on unfinished handler regardless of reason (i.e. cancellation)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, still thinking about the design here. I've put the PR into draft mode for now. |
||
async with new_worker(client, UpdateCancellationWorkflow) as worker: | ||
wf_handle = await client.start_workflow( | ||
UpdateCancellationWorkflow.run, | ||
id=str(uuid.uuid4()), | ||
task_queue=worker.task_queue, | ||
) | ||
# Asynchronously run an update that will never complete | ||
non_terminating_update = asyncio.create_task( | ||
wf_handle.execute_update(UpdateCancellationWorkflow.non_terminating_update) | ||
) | ||
# Wait until we know the update handler has started executing | ||
await wf_handle.execute_update( | ||
UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started | ||
) | ||
# Cancel the workflow and confirm that update caller sees update failed | ||
await wf_handle.cancel() | ||
with pytest.raises(WorkflowUpdateFailedError): | ||
await non_terminating_update # type: ignore |
Uh oh!
There was an error while loading. Please reload this page.