Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 39 additions & 49 deletions tests/nexus/test_workflow_caller_cancellation_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import temporalio.nexus._operation_handlers
from temporalio import exceptions, nexus, workflow
from temporalio.api.enums.v1 import EventType
from temporalio.api.history.v1 import HistoryEvent
from temporalio.client import (
WithStartWorkflowOperation,
WorkflowExecutionStatus,
Expand Down Expand Up @@ -311,10 +310,11 @@ async def check_behavior_for_abandon(
await caller_wf.signal(CallerWorkflow.release)
await caller_wf.result()
await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED),
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED),
]
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
],
)
assert not await has_event(
caller_wf,
Expand Down Expand Up @@ -347,16 +347,17 @@ async def check_behavior_for_try_cancel(
handler_status = (await handler_wf.describe()).status
assert handler_status == WorkflowExecutionStatus.CANCELED
await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED),
]
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED,
],
)
op_cancel_requested_event = await get_event_time(
caller_wf,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
)
assert result.caller_op_future_resolved < op_cancel_requested_event
assert result.caller_op_future_resolved <= op_cancel_requested_event


async def check_behavior_for_wait_cancellation_requested(
Expand All @@ -382,11 +383,12 @@ async def check_behavior_for_wait_cancellation_requested(
handler_status = (await handler_wf.describe()).status
assert handler_status == WorkflowExecutionStatus.CANCELED
await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED),
]
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED,
],
)
op_cancel_request_completed = await get_event_time(
caller_wf,
Expand All @@ -396,7 +398,7 @@ async def check_behavior_for_wait_cancellation_requested(
handler_wf,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED,
)
assert op_cancel_request_completed < result.caller_op_future_resolved < op_canceled
assert op_cancel_request_completed <= result.caller_op_future_resolved < op_canceled


async def check_behavior_for_wait_cancellation_completed(
Expand All @@ -421,23 +423,22 @@ async def check_behavior_for_wait_cancellation_completed(
result = await caller_wf.result()

await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED),
(
handler_wf,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED,
),
(handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED),
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED),
]
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
],
)
handler_wf_canceled_event = await get_event_time(
handler_wf,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED,
)
assert handler_wf_canceled_event < result.caller_op_future_resolved
assert handler_wf_canceled_event <= result.caller_op_future_resolved, (
"expected caller op future resolved after handler workflow canceled, but got "
f"{result.caller_op_future_resolved} before {handler_wf_canceled_event}"
)


async def has_event(wf_handle: WorkflowHandle, event_type: EventType.ValueType):
Expand All @@ -459,46 +460,35 @@ async def get_event_time(


async def assert_event_subsequence(
expected_events: list[tuple[WorkflowHandle, EventType.ValueType]],
wf_handle: WorkflowHandle,
expected_events: list[EventType.ValueType],
) -> None:
"""
Given a sequence of (WorkflowHandle, EventType) pairs, assert that the sorted sequence of events
from both workflows contains that subsequence.
Given a workflow handle and a sequence of event types, assert that the workflow's history
contains that subsequence of events in the order specified.
"""

def _event_time(
item: tuple[WorkflowHandle, HistoryEvent],
) -> datetime:
return item[1].event_time.ToDatetime()

all_events = []
handles = {h for h, _ in expected_events}
for h in handles:
async for e in h.fetch_history_events():
all_events.append((h, e))
_all_events = iter(sorted(all_events, key=_event_time))
async for e in wf_handle.fetch_history_events():
all_events.append(e)

_all_events = iter(all_events)
_expected_events = iter(expected_events)

previous_expected_handle, previous_expected_event_type_name = None, None
for expected_handle, expected_event_type in _expected_events:
previous_expected_event_type_name = None
for expected_event_type in _expected_events:
expected_event_type_name = EventType.Name(expected_event_type).removeprefix(
"EVENT_TYPE_"
)
has_expected = next(
(
(h, e)
for h, e in _all_events
if h == expected_handle and e.event_type == expected_event_type
),
(e for e in _all_events if e.event_type == expected_event_type),
None,
)
if not has_expected:
if previous_expected_handle is not None:
prefix = f"After {previous_expected_event_type_name} in {previous_expected_handle.id}, "
if previous_expected_event_type_name is not None:
prefix = f"After {previous_expected_event_type_name}, "
else:
prefix = ""
pytest.fail(
f"{prefix}expected {expected_event_type_name} in {expected_handle.id}"
f"{prefix}expected {expected_event_type_name} in workflow {wf_handle.id}"
)
previous_expected_event_type_name = expected_event_type_name
previous_expected_handle = expected_handle
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,11 @@ async def check_behavior_for_abandon(
assert result.error_cause_type == "CancelledError"

await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED),
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED),
]
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
],
)
assert not await has_event(
caller_wf,
Expand All @@ -301,11 +302,12 @@ async def check_behavior_for_try_cancel(
assert result.error_cause_type == "CancelledError"

await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED),
]
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED,
],
)
op_cancel_requested_event = await get_event_time(
caller_wf,
Expand All @@ -317,8 +319,8 @@ async def check_behavior_for_try_cancel(
)
assert (
result.caller_op_future_resolved
< op_cancel_requested_event
< op_cancel_request_failed_event
<= op_cancel_requested_event
<= op_cancel_request_failed_event
)


Expand All @@ -333,12 +335,13 @@ async def check_behavior_for_wait_cancellation_requested(
await handler_wf.signal(HandlerWorkflow.set_caller_op_future_resolved)
await handler_wf.result()
await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED),
(caller_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED),
]
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
],
)
op_cancel_request_failed = await get_event_time(
caller_wf,
Expand All @@ -350,8 +353,8 @@ async def check_behavior_for_wait_cancellation_requested(
)
assert (
op_cancel_request_failed
< result.caller_op_future_resolved
< handler_wf_completed
<= result.caller_op_future_resolved
<= handler_wf_completed
)


Expand All @@ -369,14 +372,14 @@ async def check_behavior_for_wait_cancellation_completed(
# (caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED)
# (handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
await assert_event_subsequence(
caller_wf,
[
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED),
(handler_wf, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED),
(caller_wf, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED),
]
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED,
],
)
handler_wf_completed = await get_event_time(
handler_wf,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
)
assert handler_wf_completed < result.caller_op_future_resolved
assert handler_wf_completed <= result.caller_op_future_resolved