Skip to content

Commit 8e1aaf7

Browse files
committed
Test cancellation
1 parent 6eb9bcc commit 8e1aaf7

File tree

1 file changed

+119
-0
lines changed

1 file changed

+119
-0
lines changed

tests/worker/test_workflow.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5357,3 +5357,122 @@ def _unfinished_handler_warning_cls(self) -> Type:
53575357
"update": workflow.UnfinishedUpdateHandlersWarning,
53585358
"signal": workflow.UnfinishedSignalHandlersWarning,
53595359
}[self.handler_type]
5360+
5361+
5362+
@workflow.defn
5363+
class UnfinishedHandlersWithCancellationWorkflow:
5364+
@workflow.run
5365+
async def run(self) -> NoReturn:
5366+
await workflow.wait_condition(lambda: False)
5367+
5368+
@workflow.update
5369+
async def my_update(self) -> str:
5370+
await workflow.wait_condition(lambda: False)
5371+
return "update-result"
5372+
5373+
@workflow.signal
5374+
async def my_signal(self):
5375+
await workflow.wait_condition(lambda: False)
5376+
5377+
5378+
async def test_unfinished_update_handler_with_workflow_cancellation(client: Client):
5379+
await _UnfinishedHandlersWithCancellationTest(
5380+
client, "update"
5381+
).test_warning_is_issued_when_cancellation_causes_exit_with_unfinished_handler()
5382+
5383+
5384+
async def test_unfinished_signal_handler_with_workflow_cancellation(client: Client):
5385+
await _UnfinishedHandlersWithCancellationTest(
5386+
client, "signal"
5387+
).test_warning_is_issued_when_cancellation_causes_exit_with_unfinished_handler()
5388+
5389+
5390+
@dataclass
5391+
class _UnfinishedHandlersWithCancellationTest:
5392+
client: Client
5393+
handler_type: Literal["update", "signal"]
5394+
5395+
async def test_warning_is_issued_when_cancellation_causes_exit_with_unfinished_handler(
5396+
self,
5397+
):
5398+
assert await self._run_workflow_and_get_warning()
5399+
5400+
async def _run_workflow_and_get_warning(self) -> bool:
5401+
workflow_id = f"wf-{uuid.uuid4()}"
5402+
update_id = "update-id"
5403+
task_queue = "tq"
5404+
5405+
# We require a cancellation request and an update to be delivered in the same WFT. To do
5406+
# this we send the start, cancel, and update/signal requests, and then start the worker
5407+
# after they've all been accepted by the server.
5408+
handle = await self.client.start_workflow(
5409+
UnfinishedHandlersWithCancellationWorkflow.run,
5410+
id=workflow_id,
5411+
task_queue=task_queue,
5412+
)
5413+
await handle.cancel()
5414+
5415+
if self.handler_type == "update":
5416+
update_task = asyncio.create_task(
5417+
handle.execute_update(
5418+
UnfinishedHandlersWithCancellationWorkflow.my_update, id=update_id
5419+
)
5420+
)
5421+
await assert_eq_eventually(
5422+
True, lambda: update_admitted(self.client, workflow_id, update_id)
5423+
)
5424+
else:
5425+
await handle.signal(UnfinishedHandlersWithCancellationWorkflow.my_signal)
5426+
5427+
async with new_worker(
5428+
self.client,
5429+
UnfinishedHandlersWithCancellationWorkflow,
5430+
task_queue=task_queue,
5431+
):
5432+
with pytest.WarningsRecorder() as warnings:
5433+
if self.handler_type == "update":
5434+
assert update_task
5435+
with pytest.raises(RPCError) as err:
5436+
await update_task
5437+
assert (
5438+
err.value.status == RPCStatusCode.NOT_FOUND
5439+
and "workflow execution already completed"
5440+
in str(err.value).lower()
5441+
)
5442+
5443+
with pytest.raises(WorkflowFailureError) as err:
5444+
await handle.result()
5445+
assert "workflow execution failed" in str(err.value).lower()
5446+
5447+
unfinished_handler_warning_emitted = any(
5448+
issubclass(w.category, self._unfinished_handler_warning_cls)
5449+
for w in warnings
5450+
)
5451+
return unfinished_handler_warning_emitted
5452+
5453+
@property
5454+
def _unfinished_handler_warning_cls(self) -> Type:
5455+
return {
5456+
"update": workflow.UnfinishedUpdateHandlersWarning,
5457+
"signal": workflow.UnfinishedSignalHandlersWarning,
5458+
}[self.handler_type]
5459+
5460+
5461+
async def update_admitted(client: Client, workflow_id: str, update_id: str) -> bool:
5462+
"""Return true if update has been admitted by server"""
5463+
try:
5464+
await client.workflow_service.poll_workflow_execution_update(
5465+
PollWorkflowExecutionUpdateRequest(
5466+
namespace=client.namespace,
5467+
update_ref=UpdateRef(
5468+
workflow_execution=WorkflowExecution(workflow_id=workflow_id),
5469+
update_id=update_id,
5470+
),
5471+
)
5472+
)
5473+
return True
5474+
except RPCError as err:
5475+
print(err)
5476+
if err.status != RPCStatusCode.NOT_FOUND:
5477+
raise
5478+
return False

0 commit comments

Comments
 (0)