From 7509e710a39c26bc7642899ff298106da7b9af14 Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 29 Aug 2025 16:18:59 -0700 Subject: [PATCH 01/12] cherry-pick change from fork --- temporalio/bridge/client.py | 4 +- temporalio/bridge/src/client.rs | 46 +++++- temporalio/client.py | 210 ++++++++++++------------- temporalio/nexus/_operation_context.py | 10 +- temporalio/service.py | 8 +- temporalio/testing/_workflow.py | 2 +- tests/api/test_grpc_stub.py | 19 ++- tests/test_client.py | 2 +- 8 files changed, 173 insertions(+), 128 deletions(-) diff --git a/temporalio/bridge/client.py b/temporalio/bridge/client.py index ddcee4445..f59fdd0a7 100644 --- a/temporalio/bridge/client.py +++ b/temporalio/bridge/client.py @@ -77,7 +77,7 @@ class RpcCall: rpc: str req: bytes retry: bool - metadata: Mapping[str, str] + metadata: Mapping[str, str | bytes] timeout_millis: Optional[int] @@ -124,7 +124,7 @@ async def call( req: google.protobuf.message.Message, resp_type: Type[ProtoMessage], retry: bool, - metadata: Mapping[str, str], + metadata: Mapping[str, str | bytes], timeout: Optional[timedelta], ) -> ProtoMessage: """Make RPC call using SDK Core.""" diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index 2f4ab867e..25db40061 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -8,7 +8,9 @@ use temporal_client::{ ConfiguredClient, HealthService, HttpConnectProxyOptions, RetryClient, RetryConfig, TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService, }; -use tonic::metadata::MetadataKey; +use tonic::metadata::{ + AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, +}; use url::Url; use crate::runtime; @@ -72,10 +74,18 @@ struct RpcCall { rpc: String, req: Vec, retry: bool, - metadata: HashMap, + metadata: HashMap, timeout_millis: Option, } +#[derive(FromPyObject)] +enum RpcMetadataValue { + #[pyo3(transparent, annotation = "str")] + Str(String), + #[pyo3(transparent, annotation = "bytes")] + Bytes(Vec), +} + pub fn connect_client<'a>( py: Python<'a>, runtime_ref: &runtime::RuntimeRef, @@ -536,12 +546,32 @@ fn rpc_req(call: RpcCall) -> PyResult LocalReturnType: ... @@ -912,7 +912,7 @@ async def execute_update_with_start_workflow( *, start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -927,7 +927,7 @@ async def execute_update_with_start_workflow( args: MultiParamSpec.args, # pyright: ignore start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -942,7 +942,7 @@ async def execute_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: ... @@ -955,7 +955,7 @@ async def execute_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: """Send an update-with-start request and wait for the update to complete. @@ -1015,7 +1015,7 @@ async def start_update_with_start_workflow( start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -1031,7 +1031,7 @@ async def start_update_with_start_workflow( start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -1047,7 +1047,7 @@ async def start_update_with_start_workflow( start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -1063,7 +1063,7 @@ async def start_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: ... @@ -1077,7 +1077,7 @@ async def start_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: """Send an update-with-start request and wait for it to be accepted. @@ -1143,7 +1143,7 @@ async def _start_update_with_start( id: Optional[str] = None, result_type: Optional[Type] = None, start_workflow_operation: WithStartWorkflowOperation[SelfType, ReturnType], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: if wait_for_stage == WorkflowUpdateStage.ADMITTED: @@ -1202,7 +1202,7 @@ def list_workflows( limit: Optional[int] = None, page_size: int = 1000, next_page_token: Optional[bytes] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowExecutionAsyncIterator: """List workflows. @@ -1243,7 +1243,7 @@ def list_workflows( async def count_workflows( self, query: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowExecutionCount: """Count workflows. @@ -1332,7 +1332,7 @@ async def create_schedule( ] = None, static_summary: Optional[str] = None, static_details: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ScheduleHandle: """Create a schedule and return its handle. @@ -1391,7 +1391,7 @@ async def list_schedules( *, page_size: int = 1000, next_page_token: Optional[bytes] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ScheduleAsyncIterator: """List schedules. @@ -1431,7 +1431,7 @@ async def update_worker_build_id_compatibility( self, task_queue: str, operation: BuildIdOp, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Used to add new Build IDs or otherwise update the relative compatibility of Build Ids as @@ -1462,7 +1462,7 @@ async def get_worker_build_id_compatibility( self, task_queue: str, max_sets: Optional[int] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkerBuildIdVersionSets: """Get the Build ID compatibility sets for a specific task queue. @@ -1494,7 +1494,7 @@ async def get_worker_task_reachability( build_ids: Sequence[str], task_queues: Sequence[str] = [], reachability_type: Optional[TaskReachabilityType] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkerTaskReachability: """Determine if some Build IDs for certain Task Queues could have tasks dispatched to them. @@ -1642,7 +1642,7 @@ async def result( self, *, follow_runs: bool = True, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ReturnType: """Wait for result of the workflow. @@ -1773,7 +1773,7 @@ async def result( async def cancel( self, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Cancel the workflow. @@ -1809,7 +1809,7 @@ async def cancel( async def describe( self, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowExecutionDescription: """Get workflow details. @@ -1848,7 +1848,7 @@ async def fetch_history( *, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowHistory: """Get workflow history. @@ -1877,7 +1877,7 @@ def fetch_history_events( wait_new_event: bool = False, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowHistoryEventAsyncIterator: """Get workflow history events as an async iterator. @@ -1919,7 +1919,7 @@ def _fetch_history_events_for_run( wait_new_event: bool = False, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowHistoryEventAsyncIterator: return self._client._impl.fetch_workflow_history_events( @@ -1943,7 +1943,7 @@ async def query( query: MethodSyncOrAsyncNoParam[SelfType, LocalReturnType], *, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -1955,7 +1955,7 @@ async def query( arg: ParamType, *, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -1970,7 +1970,7 @@ async def query( *, args: Sequence[Any], reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -1984,7 +1984,7 @@ async def query( args: Sequence[Any] = [], result_type: Optional[Type] = None, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: ... @@ -1996,7 +1996,7 @@ async def query( args: Sequence[Any] = [], result_type: Optional[Type] = None, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: """Query the workflow. @@ -2067,7 +2067,7 @@ async def signal( self, signal: MethodSyncOrAsyncNoParam[SelfType, None], *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2078,7 +2078,7 @@ async def signal( signal: MethodSyncOrAsyncSingleParam[SelfType, ParamType, None], arg: ParamType, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2091,7 +2091,7 @@ async def signal( ], *, args: Sequence[Any], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2103,7 +2103,7 @@ async def signal( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2113,7 +2113,7 @@ async def signal( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Send a signal to the workflow. @@ -2156,7 +2156,7 @@ async def terminate( self, *args: Any, reason: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Terminate the workflow. @@ -2200,7 +2200,7 @@ async def execute_update( update: temporalio.workflow.UpdateMethodMultiParam[[SelfType], LocalReturnType], *, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -2214,7 +2214,7 @@ async def execute_update( arg: ParamType, *, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -2228,7 +2228,7 @@ async def execute_update( *, args: MultiParamSpec.args, # pyright: ignore id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -2242,7 +2242,7 @@ async def execute_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: ... @@ -2254,7 +2254,7 @@ async def execute_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: """Send an update request to the workflow and wait for it to complete. @@ -2300,7 +2300,7 @@ async def start_update( *, wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -2315,7 +2315,7 @@ async def start_update( *, wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -2330,7 +2330,7 @@ async def start_update( args: MultiParamSpec.args, # pyright: ignore wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -2345,7 +2345,7 @@ async def start_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: ... @@ -2358,7 +2358,7 @@ async def start_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: """Send an update request to the workflow and return a handle to it. @@ -2406,7 +2406,7 @@ async def _start_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: if wait_for_stage == WorkflowUpdateStage.ADMITTED: @@ -2520,7 +2520,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2552,7 +2552,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2586,7 +2586,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2620,7 +2620,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2652,7 +2652,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2726,7 +2726,7 @@ def __init__( async def heartbeat( self, *details: Any, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Record a heartbeat for the activity. @@ -2750,7 +2750,7 @@ async def complete( self, result: Optional[Any] = temporalio.common._arg_unset, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Complete the activity. @@ -2775,7 +2775,7 @@ async def fail( error: Exception, *, last_heartbeat_details: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Fail the activity. @@ -2800,7 +2800,7 @@ async def fail( async def report_cancellation( self, *details: Any, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Report the activity as cancelled. @@ -3222,7 +3222,7 @@ async def map_histories( *, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> AsyncIterator[WorkflowHistory]: """Create an async iterator consuming all workflows and calling @@ -3425,7 +3425,7 @@ def __init__(self, client: Client, id: str) -> None: async def backfill( self, *backfill: ScheduleBackfill, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Backfill the schedule by going through the specified time periods as @@ -3451,7 +3451,7 @@ async def backfill( async def delete( self, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Delete this schedule. @@ -3472,7 +3472,7 @@ async def delete( async def describe( self, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ScheduleDescription: """Fetch this schedule's description. @@ -3494,7 +3494,7 @@ async def pause( self, *, note: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Pause the schedule and set a note. @@ -3518,7 +3518,7 @@ async def trigger( self, *, overlap: Optional[ScheduleOverlapPolicy] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Trigger an action on this schedule to happen immediately. @@ -3542,7 +3542,7 @@ async def unpause( self, *, note: Optional[str] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Unpause the schedule and set a note. @@ -3567,7 +3567,7 @@ async def update( self, updater: Callable[[ScheduleUpdateInput], Optional[ScheduleUpdate]], *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -3576,7 +3576,7 @@ async def update( self, updater: Callable[[ScheduleUpdateInput], Awaitable[Optional[ScheduleUpdate]]], *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -3587,7 +3587,7 @@ async def update( Union[Optional[ScheduleUpdate], Awaitable[Optional[ScheduleUpdate]]], ], *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Update a schedule using a callback to build the update from the @@ -5004,7 +5004,7 @@ def workflow_run_id(self) -> Optional[str]: async def result( self, *, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: """Wait for and return the result of the update. The result may already be known in which case no network call @@ -5050,7 +5050,7 @@ async def result( async def _poll_until_outcome( self, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: if self._known_outcome: @@ -5248,7 +5248,7 @@ class StartWorkflowInput: static_details: Optional[str] # Type may be absent ret_type: Optional[Type] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] request_eager_start: bool priority: temporalio.common.Priority @@ -5266,7 +5266,7 @@ class CancelWorkflowInput: id: str run_id: Optional[str] first_execution_run_id: Optional[str] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5276,7 +5276,7 @@ class DescribeWorkflowInput: id: str run_id: Optional[str] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5291,7 +5291,7 @@ class FetchWorkflowHistoryEventsInput: wait_new_event: bool event_filter_type: WorkflowHistoryEventFilterType skip_archival: bool - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5302,7 +5302,7 @@ class ListWorkflowsInput: query: Optional[str] page_size: int next_page_token: Optional[bytes] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] limit: Optional[int] @@ -5312,7 +5312,7 @@ class CountWorkflowsInput: """Input for :py:meth:`OutboundInterceptor.count_workflows`.""" query: Optional[str] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5328,7 +5328,7 @@ class QueryWorkflowInput: headers: Mapping[str, temporalio.api.common.v1.Payload] # Type may be absent ret_type: Optional[Type] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5341,7 +5341,7 @@ class SignalWorkflowInput: signal: str args: Sequence[Any] headers: Mapping[str, temporalio.api.common.v1.Payload] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5354,7 +5354,7 @@ class TerminateWorkflowInput: first_execution_run_id: Optional[str] args: Sequence[Any] reason: Optional[str] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5371,7 +5371,7 @@ class StartWorkflowUpdateInput: wait_for_stage: WorkflowUpdateStage headers: Mapping[str, temporalio.api.common.v1.Payload] ret_type: Optional[Type] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5385,7 +5385,7 @@ class UpdateWithStartUpdateWorkflowInput: wait_for_stage: WorkflowUpdateStage headers: Mapping[str, temporalio.api.common.v1.Payload] ret_type: Optional[Type] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5419,7 +5419,7 @@ class UpdateWithStartStartWorkflowInput: static_details: Optional[str] # Type may be absent ret_type: Optional[Type] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] priority: temporalio.common.Priority versioning_override: Optional[temporalio.common.VersioningOverride] = None @@ -5443,7 +5443,7 @@ class HeartbeatAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] details: Sequence[Any] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5453,7 +5453,7 @@ class CompleteAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] result: Optional[Any] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5464,7 +5464,7 @@ class FailAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] error: Exception last_heartbeat_details: Sequence[Any] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5474,7 +5474,7 @@ class ReportCancellationAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] details: Sequence[Any] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5492,7 +5492,7 @@ class CreateScheduleInput: temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes ] ] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5502,7 +5502,7 @@ class ListSchedulesInput: page_size: int next_page_token: Optional[bytes] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] query: Optional[str] = None @@ -5513,7 +5513,7 @@ class BackfillScheduleInput: id: str backfills: Sequence[ScheduleBackfill] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5522,7 +5522,7 @@ class DeleteScheduleInput: """Input for :py:meth:`OutboundInterceptor.delete_schedule`.""" id: str - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5531,7 +5531,7 @@ class DescribeScheduleInput: """Input for :py:meth:`OutboundInterceptor.describe_schedule`.""" id: str - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5541,7 +5541,7 @@ class PauseScheduleInput: id: str note: Optional[str] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5551,7 +5551,7 @@ class TriggerScheduleInput: id: str overlap: Optional[ScheduleOverlapPolicy] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5561,7 +5561,7 @@ class UnpauseScheduleInput: id: str note: Optional[str] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5574,7 +5574,7 @@ class UpdateScheduleInput: [ScheduleUpdateInput], Union[Optional[ScheduleUpdate], Awaitable[Optional[ScheduleUpdate]]], ] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5584,7 +5584,7 @@ class UpdateWorkerBuildIdCompatibilityInput: task_queue: str operation: BuildIdOp - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5594,7 +5594,7 @@ class GetWorkerBuildIdCompatibilityInput: task_queue: str max_sets: Optional[int] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] @@ -5605,7 +5605,7 @@ class GetWorkerTaskReachabilityInput: build_ids: Sequence[str] task_queues: Sequence[str] reachability: Optional[TaskReachabilityType] - rpc_metadata: Mapping[str, str] + rpc_metadata: Mapping[str, str | bytes] rpc_timeout: Optional[timedelta] diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index 515b5e814..2d9ff80ec 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -244,7 +244,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -279,7 +279,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -316,7 +316,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -353,7 +353,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -388,7 +388,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, diff --git a/temporalio/service.py b/temporalio/service.py index 189df6ba2..0c44708d6 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -232,7 +232,7 @@ async def check_health( *, service: str = "temporal.api.workflowservice.v1.WorkflowService", retry: bool = False, - metadata: Mapping[str, str] = {}, + metadata: Mapping[str, str | bytes] = {}, timeout: Optional[timedelta] = None, ) -> bool: """Check whether the WorkflowService is up. @@ -282,7 +282,7 @@ async def _rpc_call( *, service: str, retry: bool, - metadata: Mapping[str, str], + metadata: Mapping[str, str | bytes], timeout: Optional[timedelta], ) -> ServiceResponse: raise NotImplementedError @@ -1257,7 +1257,7 @@ async def __call__( req: ServiceRequest, *, retry: bool = False, - metadata: Mapping[str, str] = {}, + metadata: Mapping[str, str | bytes] = {}, timeout: Optional[timedelta] = None, ) -> ServiceResponse: """Invoke underlying client with the given request. @@ -1340,7 +1340,7 @@ async def _rpc_call( *, service: str, retry: bool, - metadata: Mapping[str, str], + metadata: Mapping[str, str | bytes], timeout: Optional[timedelta], ) -> ServiceResponse: global LOG_PROTOS diff --git a/temporalio/testing/_workflow.py b/temporalio/testing/_workflow.py index d0eda5580..b3bf96542 100644 --- a/temporalio/testing/_workflow.py +++ b/temporalio/testing/_workflow.py @@ -573,7 +573,7 @@ async def result( self, *, follow_runs: bool = True, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: async with self.env.time_skipping_unlocked(): diff --git a/tests/api/test_grpc_stub.py b/tests/api/test_grpc_stub.py index 25c3f5bee..89ae6a939 100644 --- a/tests/api/test_grpc_stub.py +++ b/tests/api/test_grpc_stub.py @@ -36,9 +36,9 @@ def assert_time_remaining(context: ServicerContext, expected: int) -> None: class SimpleWorkflowServer(WorkflowServiceServicer): def __init__(self) -> None: super().__init__() - self.last_metadata: Mapping[str, str] = {} + self.last_metadata: Mapping[str, str | bytes] = {} - def assert_last_metadata(self, expected: Mapping[str, str]) -> None: + def assert_last_metadata(self, expected: Mapping[str, str | bytes]) -> None: for k, v in expected.items(): assert self.last_metadata.get(k) == v @@ -135,6 +135,21 @@ async def test_grpc_metadata(): } ) + # Binary metadata values should work: + await client.workflow_service.get_system_info( + GetSystemInfoRequest(), + metadata={ + "my-binary-key-bin": b"\x00\x01", + }, + ) + workflow_server.assert_last_metadata( + { + "authorization": "Bearer my-api-key", + "my-meta-key": "my-meta-val", + "my-binary-key-bin": b"\x00\x01", + } + ) + # Overwrite API key via client RPC metadata, confirm there client.rpc_metadata = { "authorization": "my-auth-val1", diff --git a/tests/test_client.py b/tests/test_client.py index 5671bc118..ead3bfed0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -324,7 +324,7 @@ async def __call__( req: temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest, *, retry: bool = False, - metadata: Mapping[str, str] = {}, + metadata: Mapping[str, str | bytes] = {}, timeout: Optional[timedelta] = None, ) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse: raise self.already_exists_err From 14539b30b3b5ac636cf3c4e534a385b1a56fdfa6 Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 29 Aug 2025 18:02:46 -0700 Subject: [PATCH 02/12] point sdk-core to fork, add more complete metadata patch --- .gitmodules | 2 +- temporalio/bridge/client.py | 4 +-- temporalio/bridge/sdk-core | 2 +- temporalio/bridge/src/client.rs | 41 +++++++++++++++++++++++--- temporalio/client.py | 12 ++++---- temporalio/service.py | 6 ++-- temporalio/testing/_workflow.py | 4 +-- tests/worker/test_update_with_start.py | 2 +- 8 files changed, 53 insertions(+), 20 deletions(-) diff --git a/.gitmodules b/.gitmodules index ba2ba964a..e8fbd4564 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "sdk-core"] path = temporalio/bridge/sdk-core - url = https://github.com/temporalio/sdk-core.git + url = https://github.com/jazev-stripe/sdk-core.git diff --git a/temporalio/bridge/client.py b/temporalio/bridge/client.py index f59fdd0a7..83afed086 100644 --- a/temporalio/bridge/client.py +++ b/temporalio/bridge/client.py @@ -59,7 +59,7 @@ class ClientConfig: """Python representation of the Rust struct for configuring the client.""" target_url: str - metadata: Mapping[str, str] + metadata: Mapping[str, str | bytes] api_key: Optional[str] identity: str tls_config: Optional[ClientTlsConfig] @@ -108,7 +108,7 @@ def __init__( self._runtime = runtime self._ref = ref - def update_metadata(self, metadata: Mapping[str, str]) -> None: + def update_metadata(self, metadata: Mapping[str, str | bytes]) -> None: """Update underlying metadata on Core client.""" self._ref.update_metadata(metadata) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 4614dcb8f..e55c07706 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 4614dcb8f4ffd2cb244eb0a19d7485c896e3459e +Subproject commit e55c077060aa53675374bb777344a0e38d11eec8 diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index 25db40061..4d7fb8d41 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -30,7 +30,7 @@ pub struct ClientConfig { target_url: String, client_name: String, client_version: String, - metadata: HashMap, + metadata: HashMap, api_key: Option, identity: String, tls_config: Option, @@ -126,8 +126,10 @@ macro_rules! rpc_call_on_trait { #[pymethods] impl ClientRef { - fn update_metadata(&self, headers: HashMap) { - self.retry_client.get_client().set_headers(headers); + fn update_metadata(&self, headers: HashMap) { + let (ascii_headers, binary_headers) = partition_headers(headers); + self.retry_client.get_client().set_headers(ascii_headers); + self.retry_client.get_client().set_binary_headers(binary_headers); } fn update_api_key(&self, api_key: Option) { @@ -598,11 +600,41 @@ where } } +fn partition_headers( + headers: HashMap, +) -> (HashMap, HashMap>) { + let (ascii_enum_headers, binary_enum_headers): (HashMap<_, _>, HashMap<_, _>) = headers + .into_iter() + .partition(|(_, v)| matches!(v, RpcMetadataValue::Str(_))); + + let ascii_headers = ascii_enum_headers + .into_iter() + .map(|(k, v)| { + let RpcMetadataValue::Str(s) = v else { + unreachable!(); + }; + (k, s) + }) + .collect(); + let binary_headers = binary_enum_headers + .into_iter() + .map(|(k, v)| { + let RpcMetadataValue::Bytes(b) = v else { + unreachable!(); + }; + (k, b) + }) + .collect(); + + (ascii_headers, binary_headers) +} + impl TryFrom for ClientOptions { type Error = PyErr; fn try_from(opts: ClientConfig) -> PyResult { let mut gateway_opts = ClientOptionsBuilder::default(); + let (ascii_headers, binary_headers) = partition_headers(opts.metadata); gateway_opts .target_url( Url::parse(&opts.target_url) @@ -617,7 +649,8 @@ impl TryFrom for ClientOptions { ) .keep_alive(opts.keep_alive_config.map(Into::into)) .http_connect_proxy(opts.http_connect_proxy_config.map(Into::into)) - .headers(Some(opts.metadata)) + .headers(Some(ascii_headers)) + .binary_headers(Some(binary_headers)) .api_key(opts.api_key); // Builder does not allow us to set option here, so we have to make // a conditional to even call it diff --git a/temporalio/client.py b/temporalio/client.py index 3ce3a6c28..7c8d80849 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -118,7 +118,7 @@ async def connect( tls: Union[bool, TLSConfig] = False, retry_config: Optional[RetryConfig] = None, keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, identity: Optional[str] = None, lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, @@ -296,7 +296,7 @@ def data_converter(self) -> temporalio.converter.DataConverter: return self._config["data_converter"] @property - def rpc_metadata(self) -> Mapping[str, str]: + def rpc_metadata(self) -> Mapping[str, str | bytes]: """Headers for every call made by this client. Do not use mutate this mapping. Rather, set this property with an @@ -305,7 +305,7 @@ def rpc_metadata(self) -> Mapping[str, str]: return self.service_client.config.rpc_metadata @rpc_metadata.setter - def rpc_metadata(self, value: Mapping[str, str]) -> None: + def rpc_metadata(self, value: Mapping[str, str | bytes]) -> None: """Update the headers for this client. Do not mutate this mapping after set. Rather, set an entirely new @@ -7209,7 +7209,7 @@ async def connect( tls: Union[bool, TLSConfig] = True, retry_config: Optional[RetryConfig] = None, keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, identity: Optional[str] = None, lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, @@ -7301,7 +7301,7 @@ def identity(self) -> str: return self._service_client.config.identity @property - def rpc_metadata(self) -> Mapping[str, str]: + def rpc_metadata(self) -> Mapping[str, str | bytes]: """Headers for every call made by this client. Do not use mutate this mapping. Rather, set this property with an @@ -7311,7 +7311,7 @@ def rpc_metadata(self) -> Mapping[str, str]: return self.service_client.config.rpc_metadata @rpc_metadata.setter - def rpc_metadata(self, value: Mapping[str, str]) -> None: + def rpc_metadata(self, value: Mapping[str, str | bytes]) -> None: """Update the headers for this client. Do not mutate this mapping after set. Rather, set an entirely new diff --git a/temporalio/service.py b/temporalio/service.py index 0c44708d6..c9a5531b4 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -143,7 +143,7 @@ class ConnectConfig: tls: Union[bool, TLSConfig] = False retry_config: Optional[RetryConfig] = None keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default - rpc_metadata: Mapping[str, str] = field(default_factory=dict) + rpc_metadata: Mapping[str, str | bytes] = field(default_factory=dict) identity: str = "" lazy: bool = False runtime: Optional[temporalio.runtime.Runtime] = None @@ -264,7 +264,7 @@ def worker_service_client(self) -> _BridgeServiceClient: raise NotImplementedError @abstractmethod - def update_rpc_metadata(self, metadata: Mapping[str, str]) -> None: + def update_rpc_metadata(self, metadata: Mapping[str, str | bytes]) -> None: """Update service client's RPC metadata.""" raise NotImplementedError @@ -1316,7 +1316,7 @@ def worker_service_client(self) -> _BridgeServiceClient: """Underlying service client.""" return self - def update_rpc_metadata(self, metadata: Mapping[str, str]) -> None: + def update_rpc_metadata(self, metadata: Mapping[str, str | bytes]) -> None: """Update Core client metadata.""" # Mutate the bridge config and then only mutate the running client # metadata if already connected diff --git a/temporalio/testing/_workflow.py b/temporalio/testing/_workflow.py index b3bf96542..513603905 100644 --- a/temporalio/testing/_workflow.py +++ b/temporalio/testing/_workflow.py @@ -84,7 +84,7 @@ async def start_local( temporalio.common.QueryRejectCondition ] = None, retry_config: Optional[temporalio.client.RetryConfig] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, identity: Optional[str] = None, tls: bool | temporalio.client.TLSConfig = False, ip: str = "127.0.0.1", @@ -244,7 +244,7 @@ async def start_time_skipping( temporalio.common.QueryRejectCondition ] = None, retry_config: Optional[temporalio.client.RetryConfig] = None, - rpc_metadata: Mapping[str, str] = {}, + rpc_metadata: Mapping[str, str | bytes] = {}, identity: Optional[str] = None, port: Optional[int] = None, download_dest_dir: Optional[str] = None, diff --git a/tests/worker/test_update_with_start.py b/tests/worker/test_update_with_start.py index c6a11d852..b8766da82 100644 --- a/tests/worker/test_update_with_start.py +++ b/tests/worker/test_update_with_start.py @@ -822,7 +822,7 @@ async def __call__( req: temporalio.api.workflowservice.v1.ExecuteMultiOperationRequest, *, retry: bool = False, - metadata: Mapping[str, str] = {}, + metadata: Mapping[str, str | bytes] = {}, timeout: Optional[timedelta] = None, ) -> temporalio.api.workflowservice.v1.ExecuteMultiOperationResponse: raise self.empty_details_err From 21a453ebf938310671d45b78fe3ef1b3d456152a Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 29 Aug 2025 18:28:37 -0700 Subject: [PATCH 03/12] add basic test for client-wide metadata --- tests/api/test_grpc_stub.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/api/test_grpc_stub.py b/tests/api/test_grpc_stub.py index 89ae6a939..e31292297 100644 --- a/tests/api/test_grpc_stub.py +++ b/tests/api/test_grpc_stub.py @@ -150,6 +150,25 @@ async def test_grpc_metadata(): } ) + # Binary metadata should be configurable on the client: + client.rpc_metadata = { + "my-binary-key-bin": b"\x00\x01", + "my-binary-key-bin2": b"\x02\x03", + } + await client.workflow_service.get_system_info( + GetSystemInfoRequest(), + metadata={ + "my-binary-key-bin": b"abc", + }, + ) + workflow_server.assert_last_metadata( + { + "authorization": "Bearer my-api-key", + "my-binary-key-bin": b"abc", + "my-binary-key-bin2": b"\x02\x03", + } + ) + # Overwrite API key via client RPC metadata, confirm there client.rpc_metadata = { "authorization": "my-auth-val1", From 5001e9235122d84cf7adb711c3ec8494fefb2e1c Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 5 Sep 2025 12:50:45 -0700 Subject: [PATCH 04/12] update submodule --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index e55c07706..b33e7bc00 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit e55c077060aa53675374bb777344a0e38d11eec8 +Subproject commit b33e7bc007ba4558baaa6711a199bc570de53e7a From 7872b1e28970476cb4670b125a02b5b676a0f9af Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 5 Sep 2025 17:57:59 -0700 Subject: [PATCH 05/12] revert submodule back to upstream --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index e8fbd4564..dd212b1e7 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "sdk-core"] path = temporalio/bridge/sdk-core - url = https://github.com/jazev-stripe/sdk-core.git + url = https://github.com/temporalio/sdk-core.git \ No newline at end of file From b9977a6e87b019c1ef55f6bc9132f77b2708c9c8 Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 5 Sep 2025 18:00:09 -0700 Subject: [PATCH 06/12] move submodule to merged sdk-core commit --- temporalio/bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index b33e7bc00..042372d7b 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit b33e7bc007ba4558baaa6711a199bc570de53e7a +Subproject commit 042372d7b0e9931ff04dfac5d92740046ea13fb4 From 47661cf588108458ce38bca2bc1be9419f8e42f5 Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 5 Sep 2025 18:00:46 -0700 Subject: [PATCH 07/12] add newline back in --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index dd212b1e7..ba2ba964a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "sdk-core"] path = temporalio/bridge/sdk-core - url = https://github.com/temporalio/sdk-core.git \ No newline at end of file + url = https://github.com/temporalio/sdk-core.git From edb2fad825685400b168a220028a8e4e71b09c31 Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 12 Sep 2025 08:49:52 -0700 Subject: [PATCH 08/12] consume errors when updating metadata --- temporalio/bridge/src/client.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index 4d7fb8d41..0287a5b64 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -126,10 +126,19 @@ macro_rules! rpc_call_on_trait { #[pymethods] impl ClientRef { - fn update_metadata(&self, headers: HashMap) { + fn update_metadata(&self, headers: HashMap) -> PyResult<()> { let (ascii_headers, binary_headers) = partition_headers(headers); - self.retry_client.get_client().set_headers(ascii_headers); - self.retry_client.get_client().set_binary_headers(binary_headers); + + self.retry_client + .get_client() + .set_headers(ascii_headers) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + self.retry_client + .get_client() + .set_binary_headers(binary_headers) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + Ok(()) } fn update_api_key(&self, api_key: Option) { From fa9b1b2095f3de4e69fdf9bcb67f56711e1bb21b Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 12 Sep 2025 09:26:11 -0700 Subject: [PATCH 09/12] update tests, add repro of issue. reorder client.rpc_metadata setter --- temporalio/client.py | 3 +- tests/api/test_grpc_stub.py | 101 ++++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 7c8d80849..b5f7ea93b 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -312,8 +312,9 @@ def rpc_metadata(self, value: Mapping[str, str | bytes]) -> None: mapping if changes are needed. """ # Update config and perform update - self.service_client.config.rpc_metadata = value + # This may raise if the metadata is invalid: self.service_client.update_rpc_metadata(value) + self.service_client.config.rpc_metadata = value @property def api_key(self) -> Optional[str]: diff --git a/tests/api/test_grpc_stub.py b/tests/api/test_grpc_stub.py index e31292297..09ba74af9 100644 --- a/tests/api/test_grpc_stub.py +++ b/tests/api/test_grpc_stub.py @@ -1,5 +1,7 @@ from datetime import timedelta -from typing import Mapping +from typing import Mapping, Any, cast +import pytest +import re from google.protobuf.empty_pb2 import Empty from google.protobuf.timestamp_pb2 import Timestamp @@ -153,7 +155,7 @@ async def test_grpc_metadata(): # Binary metadata should be configurable on the client: client.rpc_metadata = { "my-binary-key-bin": b"\x00\x01", - "my-binary-key-bin2": b"\x02\x03", + "my-binary-key2-bin": b"\x02\x03", } await client.workflow_service.get_system_info( GetSystemInfoRequest(), @@ -165,7 +167,100 @@ async def test_grpc_metadata(): { "authorization": "Bearer my-api-key", "my-binary-key-bin": b"abc", - "my-binary-key-bin2": b"\x02\x03", + "my-binary-key2-bin": b"\x02\x03", + } + ) + + # Setting invalid RPC metadata should raise: + with pytest.raises( + ValueError, + match="Invalid binary header key 'my-ascii-key': invalid gRPC metadata key name", + ): + client.rpc_metadata = { + "my-ascii-key": b"binary-value", + } + with pytest.raises( + ValueError, + match="Invalid ASCII header key 'my-binary-key-bin': invalid gRPC metadata key name", + ): + client.rpc_metadata = { + "my-binary-key-bin": "ascii-value", + } + + # Making a request with invalid RPC metadata should raise: + with pytest.raises( + ValueError, + match="Invalid metadata value for ASCII key my-ascii-key: expected str", + ): + await client.workflow_service.get_system_info( + GetSystemInfoRequest(), + metadata={ + "my-ascii-key": b"binary-value", + }, + ) + with pytest.raises( + ValueError, + match="Invalid metadata value for binary key my-binary-key-bin: expected bytes", + ): + await client.workflow_service.get_system_info( + GetSystemInfoRequest(), + metadata={ + "my-binary-key-bin": "ascii-value", + }, + ) + + # Passing in non-`str | bytes` should raise: + with pytest.raises(TypeError) as err: + await client.workflow_service.get_system_info( + GetSystemInfoRequest(), + metadata={ + # Not a valid header: + "my-int-key": cast(Any, 256), + }, + ) + cause = err.value.__cause__ + assert isinstance(cause, TypeError) + assert re.match( + re.escape(r"failed to extract enum RpcMetadataValue ('str | bytes')"), + str(cause), + ) + with pytest.raises( + TypeError, + match=re.escape(r"failed to extract enum RpcMetadataValue ('str | bytes')"), + ) as err: + client.rpc_metadata = { + "my-binary-key-bin": cast(Any, 256), + } + + # Setting invalid RPC metadata in a mixed client will partially fail: + client.rpc_metadata = { + "x-my-binary-bin": b"\x00", + "x-my-ascii": "foo", + } + assert client.rpc_metadata == { + "x-my-binary-bin": b"\x00", + "x-my-ascii": "foo", + } + with pytest.raises( + ValueError, + match="Invalid binary header key 'x-invalid-ascii-with-bin-value': invalid gRPC metadata key name", + ): + client.rpc_metadata = { + "x-invalid-ascii-with-bin-value": b"not-ascii", + "x-my-ascii": "bar", + } + assert client.rpc_metadata == { + "x-my-binary-bin": b"\x00", + "x-my-ascii": "foo", + } + await client.workflow_service.get_system_info(GetSystemInfoRequest()) + workflow_server.assert_last_metadata( + { + "authorization": "Bearer my-api-key", + # This is inconsistent with what `client.rpc_metadata` returns + # (`x-my-ascii` was updated): + "x-my-binary-bin": b"\x00", + "x-my-ascii": "bar", } ) From 54487f58999b054cb22d6aba2a6aa21d7478945b Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 12 Sep 2025 09:40:09 -0700 Subject: [PATCH 10/12] use older-style union syntax for Python 3.9 compatability --- temporalio/bridge/client.py | 10 +- temporalio/client.py | 222 ++++++++++++------------- temporalio/nexus/_operation_context.py | 10 +- temporalio/service.py | 14 +- temporalio/testing/_workflow.py | 6 +- tests/api/test_grpc_stub.py | 6 +- tests/test_client.py | 4 +- tests/worker/test_update_with_start.py | 4 +- 8 files changed, 138 insertions(+), 138 deletions(-) diff --git a/temporalio/bridge/client.py b/temporalio/bridge/client.py index 83afed086..dafd6fb71 100644 --- a/temporalio/bridge/client.py +++ b/temporalio/bridge/client.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import timedelta -from typing import Mapping, Optional, Tuple, Type, TypeVar +from typing import Mapping, Optional, Tuple, Type, TypeVar, Union import google.protobuf.message @@ -59,7 +59,7 @@ class ClientConfig: """Python representation of the Rust struct for configuring the client.""" target_url: str - metadata: Mapping[str, str | bytes] + metadata: Mapping[str, Union[str, bytes]] api_key: Optional[str] identity: str tls_config: Optional[ClientTlsConfig] @@ -77,7 +77,7 @@ class RpcCall: rpc: str req: bytes retry: bool - metadata: Mapping[str, str | bytes] + metadata: Mapping[str, Union[str, bytes]] timeout_millis: Optional[int] @@ -108,7 +108,7 @@ def __init__( self._runtime = runtime self._ref = ref - def update_metadata(self, metadata: Mapping[str, str | bytes]) -> None: + def update_metadata(self, metadata: Mapping[str, Union[str, bytes]]) -> None: """Update underlying metadata on Core client.""" self._ref.update_metadata(metadata) @@ -124,7 +124,7 @@ async def call( req: google.protobuf.message.Message, resp_type: Type[ProtoMessage], retry: bool, - metadata: Mapping[str, str | bytes], + metadata: Mapping[str, Union[str, bytes]], timeout: Optional[timedelta], ) -> ProtoMessage: """Make RPC call using SDK Core.""" diff --git a/temporalio/client.py b/temporalio/client.py index b5f7ea93b..977395c5a 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -118,7 +118,7 @@ async def connect( tls: Union[bool, TLSConfig] = False, retry_config: Optional[RetryConfig] = None, keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, identity: Optional[str] = None, lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, @@ -296,7 +296,7 @@ def data_converter(self) -> temporalio.converter.DataConverter: return self._config["data_converter"] @property - def rpc_metadata(self) -> Mapping[str, str | bytes]: + def rpc_metadata(self) -> Mapping[str, Union[str, bytes]]: """Headers for every call made by this client. Do not use mutate this mapping. Rather, set this property with an @@ -305,7 +305,7 @@ def rpc_metadata(self) -> Mapping[str, str | bytes]: return self.service_client.config.rpc_metadata @rpc_metadata.setter - def rpc_metadata(self, value: Mapping[str, str | bytes]) -> None: + def rpc_metadata(self, value: Mapping[str, Union[str, bytes]]) -> None: """Update the headers for this client. Do not mutate this mapping after set. Rather, set an entirely new @@ -359,7 +359,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -394,7 +394,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -431,7 +431,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -468,7 +468,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -503,7 +503,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -640,7 +640,7 @@ async def execute_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -675,7 +675,7 @@ async def execute_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -712,7 +712,7 @@ async def execute_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -749,7 +749,7 @@ async def execute_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -784,7 +784,7 @@ async def execute_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -898,7 +898,7 @@ async def execute_update_with_start_workflow( *, start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -913,7 +913,7 @@ async def execute_update_with_start_workflow( *, start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -928,7 +928,7 @@ async def execute_update_with_start_workflow( args: MultiParamSpec.args, # pyright: ignore start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -943,7 +943,7 @@ async def execute_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: ... @@ -956,7 +956,7 @@ async def execute_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: """Send an update-with-start request and wait for the update to complete. @@ -1016,7 +1016,7 @@ async def start_update_with_start_workflow( start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -1032,7 +1032,7 @@ async def start_update_with_start_workflow( start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -1048,7 +1048,7 @@ async def start_update_with_start_workflow( start_workflow_operation: WithStartWorkflowOperation[SelfType, Any], wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -1064,7 +1064,7 @@ async def start_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: ... @@ -1078,7 +1078,7 @@ async def start_update_with_start_workflow( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: """Send an update-with-start request and wait for it to be accepted. @@ -1144,7 +1144,7 @@ async def _start_update_with_start( id: Optional[str] = None, result_type: Optional[Type] = None, start_workflow_operation: WithStartWorkflowOperation[SelfType, ReturnType], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: if wait_for_stage == WorkflowUpdateStage.ADMITTED: @@ -1203,7 +1203,7 @@ def list_workflows( limit: Optional[int] = None, page_size: int = 1000, next_page_token: Optional[bytes] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowExecutionAsyncIterator: """List workflows. @@ -1244,7 +1244,7 @@ def list_workflows( async def count_workflows( self, query: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowExecutionCount: """Count workflows. @@ -1333,7 +1333,7 @@ async def create_schedule( ] = None, static_summary: Optional[str] = None, static_details: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ScheduleHandle: """Create a schedule and return its handle. @@ -1392,7 +1392,7 @@ async def list_schedules( *, page_size: int = 1000, next_page_token: Optional[bytes] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ScheduleAsyncIterator: """List schedules. @@ -1432,7 +1432,7 @@ async def update_worker_build_id_compatibility( self, task_queue: str, operation: BuildIdOp, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Used to add new Build IDs or otherwise update the relative compatibility of Build Ids as @@ -1463,7 +1463,7 @@ async def get_worker_build_id_compatibility( self, task_queue: str, max_sets: Optional[int] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkerBuildIdVersionSets: """Get the Build ID compatibility sets for a specific task queue. @@ -1495,7 +1495,7 @@ async def get_worker_task_reachability( build_ids: Sequence[str], task_queues: Sequence[str] = [], reachability_type: Optional[TaskReachabilityType] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkerTaskReachability: """Determine if some Build IDs for certain Task Queues could have tasks dispatched to them. @@ -1643,7 +1643,7 @@ async def result( self, *, follow_runs: bool = True, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ReturnType: """Wait for result of the workflow. @@ -1774,7 +1774,7 @@ async def result( async def cancel( self, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Cancel the workflow. @@ -1810,7 +1810,7 @@ async def cancel( async def describe( self, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowExecutionDescription: """Get workflow details. @@ -1849,7 +1849,7 @@ async def fetch_history( *, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowHistory: """Get workflow history. @@ -1878,7 +1878,7 @@ def fetch_history_events( wait_new_event: bool = False, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowHistoryEventAsyncIterator: """Get workflow history events as an async iterator. @@ -1920,7 +1920,7 @@ def _fetch_history_events_for_run( wait_new_event: bool = False, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowHistoryEventAsyncIterator: return self._client._impl.fetch_workflow_history_events( @@ -1944,7 +1944,7 @@ async def query( query: MethodSyncOrAsyncNoParam[SelfType, LocalReturnType], *, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -1956,7 +1956,7 @@ async def query( arg: ParamType, *, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -1971,7 +1971,7 @@ async def query( *, args: Sequence[Any], reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -1985,7 +1985,7 @@ async def query( args: Sequence[Any] = [], result_type: Optional[Type] = None, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: ... @@ -1997,7 +1997,7 @@ async def query( args: Sequence[Any] = [], result_type: Optional[Type] = None, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: """Query the workflow. @@ -2068,7 +2068,7 @@ async def signal( self, signal: MethodSyncOrAsyncNoParam[SelfType, None], *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2079,7 +2079,7 @@ async def signal( signal: MethodSyncOrAsyncSingleParam[SelfType, ParamType, None], arg: ParamType, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2092,7 +2092,7 @@ async def signal( ], *, args: Sequence[Any], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2104,7 +2104,7 @@ async def signal( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -2114,7 +2114,7 @@ async def signal( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Send a signal to the workflow. @@ -2157,7 +2157,7 @@ async def terminate( self, *args: Any, reason: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Terminate the workflow. @@ -2201,7 +2201,7 @@ async def execute_update( update: temporalio.workflow.UpdateMethodMultiParam[[SelfType], LocalReturnType], *, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -2215,7 +2215,7 @@ async def execute_update( arg: ParamType, *, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -2229,7 +2229,7 @@ async def execute_update( *, args: MultiParamSpec.args, # pyright: ignore id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: ... @@ -2243,7 +2243,7 @@ async def execute_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: ... @@ -2255,7 +2255,7 @@ async def execute_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: """Send an update request to the workflow and wait for it to complete. @@ -2301,7 +2301,7 @@ async def start_update( *, wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -2316,7 +2316,7 @@ async def start_update( *, wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -2331,7 +2331,7 @@ async def start_update( args: MultiParamSpec.args, # pyright: ignore wait_for_stage: WorkflowUpdateStage, id: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[LocalReturnType]: ... @@ -2346,7 +2346,7 @@ async def start_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: ... @@ -2359,7 +2359,7 @@ async def start_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: """Send an update request to the workflow and return a handle to it. @@ -2407,7 +2407,7 @@ async def _start_update( args: Sequence[Any] = [], id: Optional[str] = None, result_type: Optional[Type] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> WorkflowUpdateHandle[Any]: if wait_for_stage == WorkflowUpdateStage.ADMITTED: @@ -2521,7 +2521,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2553,7 +2553,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2587,7 +2587,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2621,7 +2621,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2653,7 +2653,7 @@ def __init__( static_summary: Optional[str] = None, static_details: Optional[str] = None, start_delay: Optional[timedelta] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, priority: temporalio.common.Priority = temporalio.common.Priority.default, versioning_override: Optional[temporalio.common.VersioningOverride] = None, @@ -2727,7 +2727,7 @@ def __init__( async def heartbeat( self, *details: Any, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Record a heartbeat for the activity. @@ -2751,7 +2751,7 @@ async def complete( self, result: Optional[Any] = temporalio.common._arg_unset, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Complete the activity. @@ -2776,7 +2776,7 @@ async def fail( error: Exception, *, last_heartbeat_details: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Fail the activity. @@ -2801,7 +2801,7 @@ async def fail( async def report_cancellation( self, *details: Any, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Report the activity as cancelled. @@ -3223,7 +3223,7 @@ async def map_histories( *, event_filter_type: WorkflowHistoryEventFilterType = WorkflowHistoryEventFilterType.ALL_EVENT, skip_archival: bool = False, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> AsyncIterator[WorkflowHistory]: """Create an async iterator consuming all workflows and calling @@ -3426,7 +3426,7 @@ def __init__(self, client: Client, id: str) -> None: async def backfill( self, *backfill: ScheduleBackfill, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Backfill the schedule by going through the specified time periods as @@ -3452,7 +3452,7 @@ async def backfill( async def delete( self, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Delete this schedule. @@ -3473,7 +3473,7 @@ async def delete( async def describe( self, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> ScheduleDescription: """Fetch this schedule's description. @@ -3495,7 +3495,7 @@ async def pause( self, *, note: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Pause the schedule and set a note. @@ -3519,7 +3519,7 @@ async def trigger( self, *, overlap: Optional[ScheduleOverlapPolicy] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Trigger an action on this schedule to happen immediately. @@ -3543,7 +3543,7 @@ async def unpause( self, *, note: Optional[str] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Unpause the schedule and set a note. @@ -3568,7 +3568,7 @@ async def update( self, updater: Callable[[ScheduleUpdateInput], Optional[ScheduleUpdate]], *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -3577,7 +3577,7 @@ async def update( self, updater: Callable[[ScheduleUpdateInput], Awaitable[Optional[ScheduleUpdate]]], *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: ... @@ -3588,7 +3588,7 @@ async def update( Union[Optional[ScheduleUpdate], Awaitable[Optional[ScheduleUpdate]]], ], *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: """Update a schedule using a callback to build the update from the @@ -5005,7 +5005,7 @@ def workflow_run_id(self) -> Optional[str]: async def result( self, *, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> LocalReturnType: """Wait for and return the result of the update. The result may already be known in which case no network call @@ -5051,7 +5051,7 @@ async def result( async def _poll_until_outcome( self, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> None: if self._known_outcome: @@ -5249,7 +5249,7 @@ class StartWorkflowInput: static_details: Optional[str] # Type may be absent ret_type: Optional[Type] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] request_eager_start: bool priority: temporalio.common.Priority @@ -5267,7 +5267,7 @@ class CancelWorkflowInput: id: str run_id: Optional[str] first_execution_run_id: Optional[str] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5277,7 +5277,7 @@ class DescribeWorkflowInput: id: str run_id: Optional[str] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5292,7 +5292,7 @@ class FetchWorkflowHistoryEventsInput: wait_new_event: bool event_filter_type: WorkflowHistoryEventFilterType skip_archival: bool - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5303,7 +5303,7 @@ class ListWorkflowsInput: query: Optional[str] page_size: int next_page_token: Optional[bytes] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] limit: Optional[int] @@ -5313,7 +5313,7 @@ class CountWorkflowsInput: """Input for :py:meth:`OutboundInterceptor.count_workflows`.""" query: Optional[str] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5329,7 +5329,7 @@ class QueryWorkflowInput: headers: Mapping[str, temporalio.api.common.v1.Payload] # Type may be absent ret_type: Optional[Type] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5342,7 +5342,7 @@ class SignalWorkflowInput: signal: str args: Sequence[Any] headers: Mapping[str, temporalio.api.common.v1.Payload] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5355,7 +5355,7 @@ class TerminateWorkflowInput: first_execution_run_id: Optional[str] args: Sequence[Any] reason: Optional[str] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5372,7 +5372,7 @@ class StartWorkflowUpdateInput: wait_for_stage: WorkflowUpdateStage headers: Mapping[str, temporalio.api.common.v1.Payload] ret_type: Optional[Type] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5386,7 +5386,7 @@ class UpdateWithStartUpdateWorkflowInput: wait_for_stage: WorkflowUpdateStage headers: Mapping[str, temporalio.api.common.v1.Payload] ret_type: Optional[Type] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5420,7 +5420,7 @@ class UpdateWithStartStartWorkflowInput: static_details: Optional[str] # Type may be absent ret_type: Optional[Type] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] priority: temporalio.common.Priority versioning_override: Optional[temporalio.common.VersioningOverride] = None @@ -5444,7 +5444,7 @@ class HeartbeatAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] details: Sequence[Any] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5454,7 +5454,7 @@ class CompleteAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] result: Optional[Any] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5465,7 +5465,7 @@ class FailAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] error: Exception last_heartbeat_details: Sequence[Any] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5475,7 +5475,7 @@ class ReportCancellationAsyncActivityInput: id_or_token: Union[AsyncActivityIDReference, bytes] details: Sequence[Any] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5493,7 +5493,7 @@ class CreateScheduleInput: temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes ] ] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5503,7 +5503,7 @@ class ListSchedulesInput: page_size: int next_page_token: Optional[bytes] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] query: Optional[str] = None @@ -5514,7 +5514,7 @@ class BackfillScheduleInput: id: str backfills: Sequence[ScheduleBackfill] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5523,7 +5523,7 @@ class DeleteScheduleInput: """Input for :py:meth:`OutboundInterceptor.delete_schedule`.""" id: str - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5532,7 +5532,7 @@ class DescribeScheduleInput: """Input for :py:meth:`OutboundInterceptor.describe_schedule`.""" id: str - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5542,7 +5542,7 @@ class PauseScheduleInput: id: str note: Optional[str] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5552,7 +5552,7 @@ class TriggerScheduleInput: id: str overlap: Optional[ScheduleOverlapPolicy] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5562,7 +5562,7 @@ class UnpauseScheduleInput: id: str note: Optional[str] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5575,7 +5575,7 @@ class UpdateScheduleInput: [ScheduleUpdateInput], Union[Optional[ScheduleUpdate], Awaitable[Optional[ScheduleUpdate]]], ] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5585,7 +5585,7 @@ class UpdateWorkerBuildIdCompatibilityInput: task_queue: str operation: BuildIdOp - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5595,7 +5595,7 @@ class GetWorkerBuildIdCompatibilityInput: task_queue: str max_sets: Optional[int] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -5606,7 +5606,7 @@ class GetWorkerTaskReachabilityInput: build_ids: Sequence[str] task_queues: Sequence[str] reachability: Optional[TaskReachabilityType] - rpc_metadata: Mapping[str, str | bytes] + rpc_metadata: Mapping[str, Union[str, bytes]] rpc_timeout: Optional[timedelta] @@ -7210,7 +7210,7 @@ async def connect( tls: Union[bool, TLSConfig] = True, retry_config: Optional[RetryConfig] = None, keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, identity: Optional[str] = None, lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, @@ -7302,7 +7302,7 @@ def identity(self) -> str: return self._service_client.config.identity @property - def rpc_metadata(self) -> Mapping[str, str | bytes]: + def rpc_metadata(self) -> Mapping[str, Union[str, bytes]]: """Headers for every call made by this client. Do not use mutate this mapping. Rather, set this property with an @@ -7312,7 +7312,7 @@ def rpc_metadata(self) -> Mapping[str, str | bytes]: return self.service_client.config.rpc_metadata @rpc_metadata.setter - def rpc_metadata(self, value: Mapping[str, str | bytes]) -> None: + def rpc_metadata(self, value: Mapping[str, Union[str, bytes]]) -> None: """Update the headers for this client. Do not mutate this mapping after set. Rather, set an entirely new diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index 2d9ff80ec..ea76b8486 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -244,7 +244,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -279,7 +279,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -316,7 +316,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -353,7 +353,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, @@ -388,7 +388,7 @@ async def start_workflow( start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, request_eager_start: bool = False, priority: temporalio.common.Priority = temporalio.common.Priority.default, diff --git a/temporalio/service.py b/temporalio/service.py index c9a5531b4..5f36c3ae5 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -143,7 +143,7 @@ class ConnectConfig: tls: Union[bool, TLSConfig] = False retry_config: Optional[RetryConfig] = None keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default - rpc_metadata: Mapping[str, str | bytes] = field(default_factory=dict) + rpc_metadata: Mapping[str, Union[str, bytes]] = field(default_factory=dict) identity: str = "" lazy: bool = False runtime: Optional[temporalio.runtime.Runtime] = None @@ -232,7 +232,7 @@ async def check_health( *, service: str = "temporal.api.workflowservice.v1.WorkflowService", retry: bool = False, - metadata: Mapping[str, str | bytes] = {}, + metadata: Mapping[str, Union[str, bytes]] = {}, timeout: Optional[timedelta] = None, ) -> bool: """Check whether the WorkflowService is up. @@ -264,7 +264,7 @@ def worker_service_client(self) -> _BridgeServiceClient: raise NotImplementedError @abstractmethod - def update_rpc_metadata(self, metadata: Mapping[str, str | bytes]) -> None: + def update_rpc_metadata(self, metadata: Mapping[str, Union[str, bytes]]) -> None: """Update service client's RPC metadata.""" raise NotImplementedError @@ -282,7 +282,7 @@ async def _rpc_call( *, service: str, retry: bool, - metadata: Mapping[str, str | bytes], + metadata: Mapping[str, Union[str, bytes]], timeout: Optional[timedelta], ) -> ServiceResponse: raise NotImplementedError @@ -1257,7 +1257,7 @@ async def __call__( req: ServiceRequest, *, retry: bool = False, - metadata: Mapping[str, str | bytes] = {}, + metadata: Mapping[str, Union[str, bytes]] = {}, timeout: Optional[timedelta] = None, ) -> ServiceResponse: """Invoke underlying client with the given request. @@ -1316,7 +1316,7 @@ def worker_service_client(self) -> _BridgeServiceClient: """Underlying service client.""" return self - def update_rpc_metadata(self, metadata: Mapping[str, str | bytes]) -> None: + def update_rpc_metadata(self, metadata: Mapping[str, Union[str, bytes]]) -> None: """Update Core client metadata.""" # Mutate the bridge config and then only mutate the running client # metadata if already connected @@ -1340,7 +1340,7 @@ async def _rpc_call( *, service: str, retry: bool, - metadata: Mapping[str, str | bytes], + metadata: Mapping[str, Union[str, bytes]], timeout: Optional[timedelta], ) -> ServiceResponse: global LOG_PROTOS diff --git a/temporalio/testing/_workflow.py b/temporalio/testing/_workflow.py index 513603905..7c74f5cc6 100644 --- a/temporalio/testing/_workflow.py +++ b/temporalio/testing/_workflow.py @@ -84,7 +84,7 @@ async def start_local( temporalio.common.QueryRejectCondition ] = None, retry_config: Optional[temporalio.client.RetryConfig] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, identity: Optional[str] = None, tls: bool | temporalio.client.TLSConfig = False, ip: str = "127.0.0.1", @@ -244,7 +244,7 @@ async def start_time_skipping( temporalio.common.QueryRejectCondition ] = None, retry_config: Optional[temporalio.client.RetryConfig] = None, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, identity: Optional[str] = None, port: Optional[int] = None, download_dest_dir: Optional[str] = None, @@ -573,7 +573,7 @@ async def result( self, *, follow_runs: bool = True, - rpc_metadata: Mapping[str, str | bytes] = {}, + rpc_metadata: Mapping[str, Union[str, bytes]] = {}, rpc_timeout: Optional[timedelta] = None, ) -> Any: async with self.env.time_skipping_unlocked(): diff --git a/tests/api/test_grpc_stub.py b/tests/api/test_grpc_stub.py index 09ba74af9..1cb2c4bdf 100644 --- a/tests/api/test_grpc_stub.py +++ b/tests/api/test_grpc_stub.py @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import Mapping, Any, cast +from typing import Mapping, Any, cast, Union import pytest import re @@ -38,9 +38,9 @@ def assert_time_remaining(context: ServicerContext, expected: int) -> None: class SimpleWorkflowServer(WorkflowServiceServicer): def __init__(self) -> None: super().__init__() - self.last_metadata: Mapping[str, str | bytes] = {} + self.last_metadata: Mapping[str, Union[str, bytes]] = {} - def assert_last_metadata(self, expected: Mapping[str, str | bytes]) -> None: + def assert_last_metadata(self, expected: Mapping[str, Union[str, bytes]]) -> None: for k, v in expected.items(): assert self.last_metadata.get(k) == v diff --git a/tests/test_client.py b/tests/test_client.py index ead3bfed0..40be83c68 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -4,7 +4,7 @@ import os import uuid from datetime import datetime, timedelta, timezone -from typing import Any, List, Mapping, Optional, Tuple, cast +from typing import Any, List, Mapping, Optional, Tuple, cast, Union from unittest import mock import google.protobuf.any_pb2 @@ -324,7 +324,7 @@ async def __call__( req: temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest, *, retry: bool = False, - metadata: Mapping[str, str | bytes] = {}, + metadata: Mapping[str, Union[str, bytes]] = {}, timeout: Optional[timedelta] = None, ) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse: raise self.already_exists_err diff --git a/tests/worker/test_update_with_start.py b/tests/worker/test_update_with_start.py index b8766da82..c2f4b76e1 100644 --- a/tests/worker/test_update_with_start.py +++ b/tests/worker/test_update_with_start.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from datetime import timedelta from enum import Enum, IntEnum -from typing import Any, Mapping, Optional +from typing import Any, Mapping, Optional, Union from unittest.mock import patch import temporalio.api.common.v1 @@ -822,7 +822,7 @@ async def __call__( req: temporalio.api.workflowservice.v1.ExecuteMultiOperationRequest, *, retry: bool = False, - metadata: Mapping[str, str | bytes] = {}, + metadata: Mapping[str, Union[str, bytes]] = {}, timeout: Optional[timedelta] = None, ) -> temporalio.api.workflowservice.v1.ExecuteMultiOperationResponse: raise self.empty_details_err From a905e763683e4ac9426012a429d6b91f6c7e8562 Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 12 Sep 2025 10:20:15 -0700 Subject: [PATCH 11/12] poe format --- tests/api/test_grpc_stub.py | 6 +++--- tests/test_client.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/api/test_grpc_stub.py b/tests/api/test_grpc_stub.py index 1cb2c4bdf..4f456f4a8 100644 --- a/tests/api/test_grpc_stub.py +++ b/tests/api/test_grpc_stub.py @@ -1,8 +1,8 @@ -from datetime import timedelta -from typing import Mapping, Any, cast, Union -import pytest import re +from datetime import timedelta +from typing import Any, Mapping, Union, cast +import pytest from google.protobuf.empty_pb2 import Empty from google.protobuf.timestamp_pb2 import Timestamp from grpc.aio import ServicerContext diff --git a/tests/test_client.py b/tests/test_client.py index 40be83c68..e1df7258c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -4,7 +4,7 @@ import os import uuid from datetime import datetime, timedelta, timezone -from typing import Any, List, Mapping, Optional, Tuple, cast, Union +from typing import Any, List, Mapping, Optional, Tuple, Union, cast from unittest import mock import google.protobuf.any_pb2 From 8b1dd3e7aba6d88df33e9b6eba9b4cb4652d92eb Mon Sep 17 00:00:00 2001 From: Joseph Azevedo Date: Fri, 12 Sep 2025 10:31:16 -0700 Subject: [PATCH 12/12] add raises/warning section to setter docs --- temporalio/client.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/temporalio/client.py b/temporalio/client.py index 977395c5a..f9735cfb2 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -310,6 +310,14 @@ def rpc_metadata(self, value: Mapping[str, Union[str, bytes]]) -> None: Do not mutate this mapping after set. Rather, set an entirely new mapping if changes are needed. + + Raises: + TypeError: the key/value pair is not a valid gRPC ASCII or binary metadata. + All binary metadata must be supplied as bytes, and the key must end in '-bin'. + + .. warning:: + Attempting to set an invalid binary RPC metadata value may leave the client + in an inconsistent state (as well as raise a :py:class:`TypeError`). """ # Update config and perform update # This may raise if the metadata is invalid: