diff --git a/tests/nexus/test_workflow_caller_cancellation_types.py b/tests/nexus/test_workflow_caller_cancellation_types.py index 2d44e6416..0590bb2a6 100644 --- a/tests/nexus/test_workflow_caller_cancellation_types.py +++ b/tests/nexus/test_workflow_caller_cancellation_types.py @@ -11,7 +11,6 @@ import temporalio.nexus._operation_handlers from temporalio import exceptions, nexus, workflow from temporalio.api.enums.v1 import EventType -from temporalio.api.history.v1 import HistoryEvent from temporalio.client import ( WithStartWorkflowOperation, WorkflowExecutionStatus, @@ -311,10 +310,11 @@ async def check_behavior_for_abandon( await caller_wf.signal(CallerWorkflow.release) await caller_wf.result() await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED), - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), - ] + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, + ], ) assert not await has_event( caller_wf, @@ -347,16 +347,17 @@ async def check_behavior_for_try_cancel( handler_status = (await handler_wf.describe()).status assert handler_status == WorkflowExecutionStatus.CANCELED await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED), - ] + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED, + ], ) op_cancel_requested_event = await get_event_time( caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, ) - assert result.caller_op_future_resolved < op_cancel_requested_event + assert result.caller_op_future_resolved <= op_cancel_requested_event async def check_behavior_for_wait_cancellation_requested( @@ -382,11 +383,12 @@ async def check_behavior_for_wait_cancellation_requested( handler_status = (await handler_wf.describe()).status assert handler_status == WorkflowExecutionStatus.CANCELED await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED), - ] + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED, + ], ) op_cancel_request_completed = await get_event_time( caller_wf, @@ -396,7 +398,7 @@ async def check_behavior_for_wait_cancellation_requested( handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED, ) - assert op_cancel_request_completed < result.caller_op_future_resolved < op_canceled + assert op_cancel_request_completed <= result.caller_op_future_resolved < op_canceled async def check_behavior_for_wait_cancellation_completed( @@ -421,23 +423,22 @@ async def check_behavior_for_wait_cancellation_completed( result = await caller_wf.result() await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED), - ( - handler_wf, - EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED, - ), - (handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED), - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), - ] + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED, + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, + ], ) handler_wf_canceled_event = await get_event_time( handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED, ) - assert handler_wf_canceled_event < result.caller_op_future_resolved + assert handler_wf_canceled_event <= result.caller_op_future_resolved, ( + "expected caller op future resolved after handler workflow canceled, but got " + f"{result.caller_op_future_resolved} before {handler_wf_canceled_event}" + ) async def has_event(wf_handle: WorkflowHandle, event_type: EventType.ValueType): @@ -459,46 +460,35 @@ async def get_event_time( async def assert_event_subsequence( - expected_events: list[tuple[WorkflowHandle, EventType.ValueType]], + wf_handle: WorkflowHandle, + expected_events: list[EventType.ValueType], ) -> None: """ - Given a sequence of (WorkflowHandle, EventType) pairs, assert that the sorted sequence of events - from both workflows contains that subsequence. + Given a workflow handle and a sequence of event types, assert that the workflow's history + contains that subsequence of events in the order specified. """ - - def _event_time( - item: tuple[WorkflowHandle, HistoryEvent], - ) -> datetime: - return item[1].event_time.ToDatetime() - all_events = [] - handles = {h for h, _ in expected_events} - for h in handles: - async for e in h.fetch_history_events(): - all_events.append((h, e)) - _all_events = iter(sorted(all_events, key=_event_time)) + async for e in wf_handle.fetch_history_events(): + all_events.append(e) + + _all_events = iter(all_events) _expected_events = iter(expected_events) - previous_expected_handle, previous_expected_event_type_name = None, None - for expected_handle, expected_event_type in _expected_events: + previous_expected_event_type_name = None + for expected_event_type in _expected_events: expected_event_type_name = EventType.Name(expected_event_type).removeprefix( "EVENT_TYPE_" ) has_expected = next( - ( - (h, e) - for h, e in _all_events - if h == expected_handle and e.event_type == expected_event_type - ), + (e for e in _all_events if e.event_type == expected_event_type), None, ) if not has_expected: - if previous_expected_handle is not None: - prefix = f"After {previous_expected_event_type_name} in {previous_expected_handle.id}, " + if previous_expected_event_type_name is not None: + prefix = f"After {previous_expected_event_type_name}, " else: prefix = "" pytest.fail( - f"{prefix}expected {expected_event_type_name} in {expected_handle.id}" + f"{prefix}expected {expected_event_type_name} in workflow {wf_handle.id}" ) previous_expected_event_type_name = expected_event_type_name - previous_expected_handle = expected_handle diff --git a/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py b/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py index 33b167245..12f99a714 100644 --- a/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py +++ b/tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py @@ -279,10 +279,11 @@ async def check_behavior_for_abandon( assert result.error_cause_type == "CancelledError" await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED), - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), - ] + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, + ], ) assert not await has_event( caller_wf, @@ -301,11 +302,12 @@ async def check_behavior_for_try_cancel( assert result.error_cause_type == "CancelledError" await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED), - ] + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED, + ], ) op_cancel_requested_event = await get_event_time( caller_wf, @@ -317,8 +319,8 @@ async def check_behavior_for_try_cancel( ) assert ( result.caller_op_future_resolved - < op_cancel_requested_event - < op_cancel_request_failed_event + <= op_cancel_requested_event + <= op_cancel_request_failed_event ) @@ -333,12 +335,13 @@ async def check_behavior_for_wait_cancellation_requested( await handler_wf.signal(HandlerWorkflow.set_caller_op_future_resolved) await handler_wf.result() await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED), - (caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), - ] + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED, + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, + ], ) op_cancel_request_failed = await get_event_time( caller_wf, @@ -350,8 +353,8 @@ async def check_behavior_for_wait_cancellation_requested( ) assert ( op_cancel_request_failed - < result.caller_op_future_resolved - < handler_wf_completed + <= result.caller_op_future_resolved + <= handler_wf_completed ) @@ -369,14 +372,14 @@ async def check_behavior_for_wait_cancellation_completed( # (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED) # (handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED) await assert_event_subsequence( + caller_wf, [ - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED), - (handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), - (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED), - ] + EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, + EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED, + ], ) handler_wf_completed = await get_event_time( handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, ) - assert handler_wf_completed < result.caller_op_future_resolved + assert handler_wf_completed <= result.caller_op_future_resolved