From 83382c84ab9e0a01330e302e1e42589a31a7b13b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 14 Aug 2025 09:11:51 -0400 Subject: [PATCH 1/3] Failing test: USE_EXISTING conflict policy --- temporalio/nexus/_operation_context.py | 8 -- .../test_use_existing_conflict_policy.py | 124 ++++++++++++++++++ 2 files changed, 124 insertions(+), 8 deletions(-) create mode 100644 tests/nexus/test_use_existing_conflict_policy.py diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index 4439614a3..78b305e56 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -404,14 +404,6 @@ async def start_workflow( # attachRequestId: true, # }; # } - if ( - id_conflict_policy - == temporalio.common.WorkflowIDConflictPolicy.USE_EXISTING - ): - raise RuntimeError( - "WorkflowIDConflictPolicy.USE_EXISTING is not yet supported when starting a workflow " - "that backs a Nexus operation (Python SDK Nexus support is at Pre-release stage)." - ) # We must pass nexus_completion_callbacks, workflow_event_links, and request_id, # but these are deliberately not exposed in overloads, hence the type-check diff --git a/tests/nexus/test_use_existing_conflict_policy.py b/tests/nexus/test_use_existing_conflict_policy.py new file mode 100644 index 000000000..03d2894e9 --- /dev/null +++ b/tests/nexus/test_use_existing_conflict_policy.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import asyncio +import uuid +from dataclasses import dataclass +from typing import Optional + +import pytest +from nexusrpc.handler import service_handler + +from temporalio import nexus, workflow +from temporalio.client import Client +from temporalio.common import WorkflowIDConflictPolicy +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker +from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name + + +@dataclass +class OpInput: + workflow_id: str + conflict_policy: WorkflowIDConflictPolicy + + +@workflow.defn +class HandlerWorkflow: + def __init__(self) -> None: + self.result: Optional[str] = None + + @workflow.run + async def run(self) -> str: + await workflow.wait_condition(lambda: self.result is not None) + assert self.result + return self.result + + @workflow.signal + def complete(self, result: str) -> None: + self.result = result + + +@service_handler +class NexusService: + @nexus.workflow_run_operation + async def workflow_backed_operation( + self, ctx: nexus.WorkflowRunOperationContext, input: OpInput + ) -> nexus.WorkflowHandle[str]: + return await ctx.start_workflow( + HandlerWorkflow.run, + id=input.workflow_id, + id_conflict_policy=input.conflict_policy, + ) + + +@dataclass +class CallerWorkflowInput: + workflow_id: str + task_queue: str + num_operations: int + + +@workflow.defn +class CallerWorkflow: + def __init__(self) -> None: + self._nexus_operations_have_started = asyncio.Event() + + @workflow.run + async def run(self, input: CallerWorkflowInput) -> list[str]: + nexus_client = workflow.create_nexus_client( + service=NexusService, endpoint=make_nexus_endpoint_name(input.task_queue) + ) + + op_input = OpInput( + workflow_id=input.workflow_id, + conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + + handles = [] + for _ in range(input.num_operations): + handles.append( + await nexus_client.start_operation( + NexusService.workflow_backed_operation, op_input + ) + ) + self._nexus_operations_have_started.set() + return await asyncio.gather(*handles) + + @workflow.update + async def nexus_operations_have_started(self) -> None: + await self._nexus_operations_have_started.wait() + + +async def test_multiple_operation_invocations_can_connect_to_same_handler_workflow( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + workflow_id = str(uuid.uuid4()) + + async with Worker( + client, + nexus_service_handlers=[NexusService()], + workflows=[CallerWorkflow, HandlerWorkflow], + task_queue=task_queue, + ): + await create_nexus_endpoint(task_queue, client) + caller_handle = await client.start_workflow( + CallerWorkflow.run, + args=[ + CallerWorkflowInput( + workflow_id=workflow_id, + task_queue=task_queue, + num_operations=5, + ) + ], + id=str(uuid.uuid4()), + task_queue=task_queue, + ) + await caller_handle.execute_update(CallerWorkflow.nexus_operations_have_started) + await client.get_workflow_handle(workflow_id).signal( + HandlerWorkflow.complete, "test-result" + ) + assert await caller_handle.result() == ["test-result"] * 5 From f3cf5eebb4849c0cc8c39c526aa31910b54ae89f Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 14 Aug 2025 10:20:23 -0400 Subject: [PATCH 2/3] Implement feature --- temporalio/client.py | 8 +++ temporalio/nexus/_operation_context.py | 89 ++++++++++++++----------- temporalio/worker/_workflow_instance.py | 7 -- 3 files changed, 58 insertions(+), 46 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 71bdf3dee..50fab46ba 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -57,6 +57,7 @@ import temporalio.converter import temporalio.exceptions import temporalio.nexus +import temporalio.nexus._operation_context import temporalio.runtime import temporalio.service import temporalio.workflow @@ -5877,6 +5878,12 @@ async def _build_start_workflow_execution_request( ) # Links are duplicated on request for compatibility with older server versions. req.links.extend(links) + + if temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context(): + req.on_conflict_options.attach_request_id = True + req.on_conflict_options.attach_completion_callbacks = True + req.on_conflict_options.attach_links = True + return req async def _build_signal_with_start_workflow_execution_request( @@ -5932,6 +5939,7 @@ async def _populate_start_workflow_execution_request( "temporalio.api.enums.v1.WorkflowIdConflictPolicy.ValueType", int(input.id_conflict_policy), ) + if input.retry_policy is not None: input.retry_policy.apply_to_proto(req.retry_policy) req.cron_schedule = input.cron_schedule diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index 78b305e56..47dd693df 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -3,6 +3,7 @@ import dataclasses import logging from collections.abc import Awaitable, Mapping, MutableMapping, Sequence +from contextlib import contextmanager from contextvars import ContextVar from dataclasses import dataclass from datetime import timedelta @@ -10,6 +11,7 @@ TYPE_CHECKING, Any, Callable, + Generator, Optional, Union, overload, @@ -47,6 +49,10 @@ ContextVar("temporal-cancel-operation-context") ) +_temporal_nexus_backing_workflow_start_context: ContextVar[bool] = ContextVar( + "temporal-nexus-backing-workflow-start-context" +) + @dataclass(frozen=True) class Info: @@ -96,6 +102,19 @@ def _try_temporal_context() -> ( return start_ctx or cancel_ctx +@contextmanager +def _nexus_backing_workflow_start_context() -> Generator[None, None, None]: + token = _temporal_nexus_backing_workflow_start_context.set(True) + try: + yield + finally: + _temporal_nexus_backing_workflow_start_context.reset(token) + + +def _in_nexus_backing_workflow_start_context() -> bool: + return _temporal_nexus_backing_workflow_start_context.get(False) + + @dataclass class _TemporalStartOperationContext: """Context for a Nexus start operation being handled by a Temporal Nexus Worker.""" @@ -396,48 +415,40 @@ async def start_workflow( Nexus caller is itself a workflow, this means that the workflow in the caller namespace web UI will contain links to the started workflow, and vice versa. """ - # TODO(nexus-preview): When sdk-python supports on_conflict_options, Typescript does this: - # if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') { - # internalOptions.onConflictOptions = { - # attachLinks: true, - # attachCompletionCallbacks: true, - # attachRequestId: true, - # }; - # } - # We must pass nexus_completion_callbacks, workflow_event_links, and request_id, # but these are deliberately not exposed in overloads, hence the type-check # violation. - wf_handle = await self._temporal_context.client.start_workflow( # type: ignore - workflow=workflow, - arg=arg, - args=args, - id=id, - task_queue=task_queue or self._temporal_context.info().task_queue, - result_type=result_type, - execution_timeout=execution_timeout, - run_timeout=run_timeout, - task_timeout=task_timeout, - id_reuse_policy=id_reuse_policy, - id_conflict_policy=id_conflict_policy, - retry_policy=retry_policy, - cron_schedule=cron_schedule, - memo=memo, - search_attributes=search_attributes, - static_summary=static_summary, - static_details=static_details, - start_delay=start_delay, - start_signal=start_signal, - start_signal_args=start_signal_args, - rpc_metadata=rpc_metadata, - rpc_timeout=rpc_timeout, - request_eager_start=request_eager_start, - priority=priority, - versioning_override=versioning_override, - callbacks=self._temporal_context._get_callbacks(), - workflow_event_links=self._temporal_context._get_workflow_event_links(), - request_id=self._temporal_context.nexus_context.request_id, - ) + with _nexus_backing_workflow_start_context(): + wf_handle = await self._temporal_context.client.start_workflow( # type: ignore + workflow=workflow, + arg=arg, + args=args, + id=id, + task_queue=task_queue or self._temporal_context.info().task_queue, + result_type=result_type, + execution_timeout=execution_timeout, + run_timeout=run_timeout, + task_timeout=task_timeout, + id_reuse_policy=id_reuse_policy, + id_conflict_policy=id_conflict_policy, + retry_policy=retry_policy, + cron_schedule=cron_schedule, + memo=memo, + search_attributes=search_attributes, + static_summary=static_summary, + static_details=static_details, + start_delay=start_delay, + start_signal=start_signal, + start_signal_args=start_signal_args, + rpc_metadata=rpc_metadata, + rpc_timeout=rpc_timeout, + request_eager_start=request_eager_start, + priority=priority, + versioning_override=versioning_override, + callbacks=self._temporal_context._get_callbacks(), + workflow_event_links=self._temporal_context._get_workflow_event_links(), + request_id=self._temporal_context.nexus_context.request_id, + ) self._temporal_context._add_outbound_links(wf_handle) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index ada236ab2..c93155672 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -3029,13 +3029,6 @@ def operation_token(self) -> Optional[str]: def __await__(self) -> Generator[Any, Any, OutputT]: return self._task.__await__() - def __repr__(self) -> str: - return ( - f"{self._start_fut} " - f"{self._result_fut} " - f"Task[{self._task._state}] fut_waiter = {self._task._fut_waiter}) ({self._task._must_cancel})" # type: ignore - ) - def cancel(self) -> bool: return self._task.cancel() From 84c5ac7b02930ac3f6b01dfad3c43057d5ecc832 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Aug 2025 14:04:57 -0400 Subject: [PATCH 3/3] Add some documentation comments --- temporalio/nexus/_operation_context.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index 47dd693df..515b5e814 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -49,6 +49,10 @@ ContextVar("temporal-cancel-operation-context") ) +# A Nexus start handler might start zero or more workflows as usual using a Temporal client. In +# addition, it may start one "nexus-backing" workflow, using +# WorkflowRunOperationContext.start_workflow. This context is active while the latter is being done. +# It is thus a narrower context than _temporal_start_operation_context. _temporal_nexus_backing_workflow_start_context: ContextVar[bool] = ContextVar( "temporal-nexus-backing-workflow-start-context" ) @@ -418,6 +422,12 @@ async def start_workflow( # We must pass nexus_completion_callbacks, workflow_event_links, and request_id, # but these are deliberately not exposed in overloads, hence the type-check # violation. + + # Here we are starting a "nexus-backing" workflow. That means that the StartWorkflow request + # contains nexus-specific data such as a completion callback (used by the handler server + # namespace to deliver the result to the caller namespace when the workflow reaches a + # terminal state) and inbound links to the caller workflow (attached to history events of + # the workflow started in the handler namespace, and displayed in the UI). with _nexus_backing_workflow_start_context(): wf_handle = await self._temporal_context.client.start_workflow( # type: ignore workflow=workflow,