@@ -5360,9 +5360,15 @@ def _unfinished_handler_warning_cls(self) -> Type:
5360
5360
5361
5361
5362
5362
@workflow .defn
5363
- class UnfinishedHandlersWithCancellationWorkflow :
5363
+ class UnfinishedHandlersWithCancellationOrFailureWorkflow :
5364
5364
@workflow .run
5365
- async def run (self ) -> NoReturn :
5365
+ async def run (
5366
+ self , workflow_termination_type : Literal ["cancellation" , "failure" ]
5367
+ ) -> NoReturn :
5368
+ if workflow_termination_type == "failure" :
5369
+ raise ApplicationError (
5370
+ "Deliberately failing workflow with an unfinished handler"
5371
+ )
5366
5372
await workflow .wait_condition (lambda : False )
5367
5373
5368
5374
@workflow .update
@@ -5376,23 +5382,44 @@ async def my_signal(self):
5376
5382
5377
5383
5378
5384
async def test_unfinished_update_handler_with_workflow_cancellation (client : Client ):
5379
- await _UnfinishedHandlersWithCancellationTest (
5380
- client , "update"
5381
- ).test_warning_is_issued_when_cancellation_causes_exit_with_unfinished_handler ()
5385
+ await _UnfinishedHandlersWithCancellationOrFailureTest (
5386
+ client ,
5387
+ "update" ,
5388
+ "cancellation" ,
5389
+ ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler ()
5382
5390
5383
5391
5384
5392
async def test_unfinished_signal_handler_with_workflow_cancellation (client : Client ):
5385
- await _UnfinishedHandlersWithCancellationTest (
5386
- client , "signal"
5387
- ).test_warning_is_issued_when_cancellation_causes_exit_with_unfinished_handler ()
5393
+ await _UnfinishedHandlersWithCancellationOrFailureTest (
5394
+ client ,
5395
+ "signal" ,
5396
+ "cancellation" ,
5397
+ ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler ()
5398
+
5399
+
5400
+ async def test_unfinished_update_handler_with_workflow_failure (client : Client ):
5401
+ await _UnfinishedHandlersWithCancellationOrFailureTest (
5402
+ client ,
5403
+ "update" ,
5404
+ "failure" ,
5405
+ ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler ()
5406
+
5407
+
5408
+ async def test_unfinished_signal_handler_with_workflow_failure (client : Client ):
5409
+ await _UnfinishedHandlersWithCancellationOrFailureTest (
5410
+ client ,
5411
+ "signal" ,
5412
+ "failure" ,
5413
+ ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler ()
5388
5414
5389
5415
5390
5416
@dataclass
5391
- class _UnfinishedHandlersWithCancellationTest :
5417
+ class _UnfinishedHandlersWithCancellationOrFailureTest :
5392
5418
client : Client
5393
5419
handler_type : Literal ["update" , "signal" ]
5420
+ workflow_termination_type : Literal ["cancellation" , "failure" ]
5394
5421
5395
- async def test_warning_is_issued_when_cancellation_causes_exit_with_unfinished_handler (
5422
+ async def test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler (
5396
5423
self ,
5397
5424
):
5398
5425
assert await self ._run_workflow_and_get_warning ()
@@ -5402,31 +5429,37 @@ async def _run_workflow_and_get_warning(self) -> bool:
5402
5429
update_id = "update-id"
5403
5430
task_queue = "tq"
5404
5431
5405
- # We require a cancellation request and an update to be delivered in the same WFT. To do
5406
- # this we send the start, cancel, and update/signal requests, and then start the worker
5407
- # after they've all been accepted by the server.
5432
+ # We require a startWorkflow, an update, and maybe a cancellation request, to be delivered
5433
+ # in the same WFT. To do this we start the worker after they've all been accepted by the
5434
+ # server.
5408
5435
handle = await self .client .start_workflow (
5409
- UnfinishedHandlersWithCancellationWorkflow .run ,
5436
+ UnfinishedHandlersWithCancellationOrFailureWorkflow .run ,
5437
+ self .workflow_termination_type ,
5410
5438
id = workflow_id ,
5411
5439
task_queue = task_queue ,
5412
5440
)
5413
- await handle .cancel ()
5441
+ if self .workflow_termination_type == "cancellation" :
5442
+ await handle .cancel ()
5414
5443
5415
5444
if self .handler_type == "update" :
5416
5445
update_task = asyncio .create_task (
5417
5446
handle .execute_update (
5418
- UnfinishedHandlersWithCancellationWorkflow .my_update , id = update_id
5447
+ UnfinishedHandlersWithCancellationOrFailureWorkflow .my_update ,
5448
+ id = update_id ,
5419
5449
)
5420
5450
)
5421
5451
await assert_eq_eventually (
5422
- True , lambda : workflow_update_exists (self .client , workflow_id , update_id )
5452
+ True ,
5453
+ lambda : workflow_update_exists (self .client , workflow_id , update_id ),
5423
5454
)
5424
5455
else :
5425
- await handle .signal (UnfinishedHandlersWithCancellationWorkflow .my_signal )
5456
+ await handle .signal (
5457
+ UnfinishedHandlersWithCancellationOrFailureWorkflow .my_signal
5458
+ )
5426
5459
5427
5460
async with new_worker (
5428
5461
self .client ,
5429
- UnfinishedHandlersWithCancellationWorkflow ,
5462
+ UnfinishedHandlersWithCancellationOrFailureWorkflow ,
5430
5463
task_queue = task_queue ,
5431
5464
):
5432
5465
with pytest .WarningsRecorder () as warnings :
0 commit comments