From f9aa7c0d63ee0be52384c80979e9fc653b67485b Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 1 Jul 2025 14:44:09 -0700 Subject: [PATCH 1/4] Making raise on cancellation the default --- temporalio/worker/_workflow_instance.py | 10 ++-------- tests/worker/test_workflow.py | 4 ---- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index d9de1ba40..94235dcb4 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1622,14 +1622,11 @@ async def run_activity() -> Any: if handle._result_fut.done(): # TODO in next release, check sdk flag when not replaying instead of global override, remove the override, and set flag use if ( - ( - not self._is_replaying - and _raise_on_cancelling_completed_activity_override - ) + not self._is_replaying or _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY in self._current_internal_flags ): - # self._current_completion.successful.used_internal_flags.append(WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY) + self._current_completion.successful.used_internal_flags.append(_WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY) raise # Send a cancel request to the activity handle._apply_cancel_command(self._add_command()) @@ -3139,6 +3136,3 @@ class _WorkflowLogicFlag(IntEnum): RAISE_ON_CANCELLING_COMPLETED_ACTIVITY = 1 - -# Used by tests to validate behavior prior to SDK flag becoming default -_raise_on_cancelling_completed_activity_override = False diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 0b3574563..fcf06fa7a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -8005,8 +8005,6 @@ async def test_quick_activity_swallows_cancellation(client: Client): activities=[short_activity_async], activity_executor=concurrent.futures.ThreadPoolExecutor(max_workers=1), ) as worker: - temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = True - for i in range(10): wf_duration = random.uniform(5.0, 15.0) wf_handle = await client.start_workflow( @@ -8028,8 +8026,6 @@ async def test_quick_activity_swallows_cancellation(client: Client): assert isinstance(cause, CancelledError) assert cause.message == "Workflow cancelled" - temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = False - async def test_workflow_logging_trace_identifier(client: Client): with LogCapturer().logs_captured( From 54c7ac77afbf269541fd332c7b0b8073f6bdc725 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 1 Jul 2025 14:46:22 -0700 Subject: [PATCH 2/4] Lint --- temporalio/worker/_workflow_instance.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 94235dcb4..597bb24d7 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1626,7 +1626,9 @@ async def run_activity() -> Any: or _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY in self._current_internal_flags ): - self._current_completion.successful.used_internal_flags.append(_WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY) + self._current_completion.successful.used_internal_flags.append( + _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY + ) raise # Send a cancel request to the activity handle._apply_cancel_command(self._add_command()) @@ -3135,4 +3137,3 @@ class _WorkflowLogicFlag(IntEnum): """Flags that may be set on task/activation completion to differentiate new from old workflow behavior.""" RAISE_ON_CANCELLING_COMPLETED_ACTIVITY = 1 - From 399cb6268ccfc49abf61c889e1289f8d08ad94fb Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 2 Jul 2025 08:52:05 -0700 Subject: [PATCH 3/4] Remove todo --- temporalio/bridge/worker.py | 23 +++++++++++++++++++++++ temporalio/worker/_workflow_instance.py | 1 - 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 74cf55bfd..4728447ee 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -316,6 +316,23 @@ async def _decode_payload( """Decode a payload with the given codec.""" return await _apply_to_payload(payload, codec.decode) +async def _decode_headers( + headers: google.protobuf.internal.containers.MessageMap[str, temporalio.api.common.v1.Payload], + codec: temporalio.converter.PayloadCodec, +) -> None: + """Decode payloads in a header map with the given codec.""" + return await _apply_to_headers(headers, codec.decode) + +async def _apply_to_headers( + headers: google.protobuf.internal.containers.MessageMap[str, temporalio.api.common.v1.Payload], + cb: Callable[ + [Sequence[temporalio.api.common.v1.Payload]], + Awaitable[List[temporalio.api.common.v1.Payload]], + ],) -> None: + """Decode payloads with the given codec.""" + for payload in headers.values(): + new_payload = (await cb([payload]))[0] + payload.CopyFrom(new_payload) async def _encode_payloads( payloads: PayloadContainer, @@ -341,6 +358,7 @@ async def decode_activation( for job in act.jobs: if job.HasField("query_workflow"): await _decode_payloads(job.query_workflow.arguments, codec) + await _decode_headers(job.query_workflow.headers, codec) elif job.HasField("resolve_activity"): if job.resolve_activity.result.HasField("cancelled"): await codec.decode_failure( @@ -385,8 +403,12 @@ async def decode_activation( await codec.decode_failure(job.resolve_signal_external_workflow.failure) elif job.HasField("signal_workflow"): await _decode_payloads(job.signal_workflow.input, codec) + await _decode_headers(job.signal_workflow.headers, codec) + elif job.HasField("initialize_workflow"): await _decode_payloads(job.initialize_workflow.arguments, codec) + await _decode_headers(job.initialize_workflow.headers, codec) + if job.initialize_workflow.HasField("continued_failure"): await codec.decode_failure(job.initialize_workflow.continued_failure) for val in job.initialize_workflow.memo.fields.values(): @@ -400,6 +422,7 @@ async def decode_activation( val.data = new_payload.data elif job.HasField("do_update"): await _decode_payloads(job.do_update.input, codec) + await _decode_headers(job.do_update.headers, codec) async def encode_completion( diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 597bb24d7..528b42197 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1620,7 +1620,6 @@ async def run_activity() -> Any: # If an activity future completes at the same time as a cancellation is being processed, the cancellation would be swallowed # _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY will correctly reraise the exception if handle._result_fut.done(): - # TODO in next release, check sdk flag when not replaying instead of global override, remove the override, and set flag use if ( not self._is_replaying or _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY From ba5de41658f98d0a6abd4cdd96090a0ae40725c0 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 2 Jul 2025 12:17:34 -0700 Subject: [PATCH 4/4] Remove code from other feature --- temporalio/bridge/worker.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 4728447ee..74cf55bfd 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -316,23 +316,6 @@ async def _decode_payload( """Decode a payload with the given codec.""" return await _apply_to_payload(payload, codec.decode) -async def _decode_headers( - headers: google.protobuf.internal.containers.MessageMap[str, temporalio.api.common.v1.Payload], - codec: temporalio.converter.PayloadCodec, -) -> None: - """Decode payloads in a header map with the given codec.""" - return await _apply_to_headers(headers, codec.decode) - -async def _apply_to_headers( - headers: google.protobuf.internal.containers.MessageMap[str, temporalio.api.common.v1.Payload], - cb: Callable[ - [Sequence[temporalio.api.common.v1.Payload]], - Awaitable[List[temporalio.api.common.v1.Payload]], - ],) -> None: - """Decode payloads with the given codec.""" - for payload in headers.values(): - new_payload = (await cb([payload]))[0] - payload.CopyFrom(new_payload) async def _encode_payloads( payloads: PayloadContainer, @@ -358,7 +341,6 @@ async def decode_activation( for job in act.jobs: if job.HasField("query_workflow"): await _decode_payloads(job.query_workflow.arguments, codec) - await _decode_headers(job.query_workflow.headers, codec) elif job.HasField("resolve_activity"): if job.resolve_activity.result.HasField("cancelled"): await codec.decode_failure( @@ -403,12 +385,8 @@ async def decode_activation( await codec.decode_failure(job.resolve_signal_external_workflow.failure) elif job.HasField("signal_workflow"): await _decode_payloads(job.signal_workflow.input, codec) - await _decode_headers(job.signal_workflow.headers, codec) - elif job.HasField("initialize_workflow"): await _decode_payloads(job.initialize_workflow.arguments, codec) - await _decode_headers(job.initialize_workflow.headers, codec) - if job.initialize_workflow.HasField("continued_failure"): await codec.decode_failure(job.initialize_workflow.continued_failure) for val in job.initialize_workflow.memo.fields.values(): @@ -422,7 +400,6 @@ async def decode_activation( val.data = new_payload.data elif job.HasField("do_update"): await _decode_payloads(job.do_update.input, codec) - await _decode_headers(job.do_update.headers, codec) async def encode_completion(