From 5b8285cd5df51621f43a0bef53533a7cb3cb12c2 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 9 Oct 2025 12:23:53 -0700 Subject: [PATCH 1/8] initial draft of fix --- temporalio/client.py | 6 + temporalio/contrib/opentelemetry.py | 21 +++ tests/contrib/test_opentelemetry.py | 219 +++++++++++++++++++++------- 3 files changed, 190 insertions(+), 56 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 6c26d41ef..ba1e4e2ca 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1208,6 +1208,7 @@ def on_start_error( start_workflow_input=start_workflow_operation._start_workflow_input, update_workflow_input=update_input, _on_start=on_start, + headers={}, _on_start_error=on_start_error, ) @@ -5538,6 +5539,7 @@ class StartWorkflowUpdateWithStartInput: [temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse], None ] _on_start_error: Callable[[BaseException], None] + headers: Mapping[str, temporalio.api.common.v1.Payload] @dataclass @@ -6362,6 +6364,10 @@ def on_start( err: Optional[BaseException] = None try: + # fan headers out to commands + input.start_workflow_input.headers = input.headers + input.update_workflow_input.headers = input.headers + return await self._start_workflow_update_with_start( input.start_workflow_input, input.update_workflow_input, on_start ) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 04d40d544..f583b1fdc 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -292,6 +292,27 @@ async def start_workflow_update( ): return await super().start_workflow_update(input) + async def start_update_with_start_workflow( + self, input: temporalio.client.StartWorkflowUpdateWithStartInput + ) -> temporalio.client.WorkflowUpdateHandle[Any]: + attrs = { + "temporalWorkflowID": input.start_workflow_input.id, + } + if input.update_workflow_input.update_id is not None: + attrs["temporalUpdateID"] = input.update_workflow_input.update_id + + with self.root._start_as_current_span( + f"StartUpdateWithStartWorkflow:{input.start_workflow_input.workflow}", + attributes=attrs, + input=input, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + span = opentelemetry.trace.get_current_span().get_span_context() + print( + f"start_update_with_start_span {opentelemetry.trace.format_trace_id(span.trace_id)} {opentelemetry.trace.format_span_id(span.span_id)}" + ) + return await super().start_update_with_start_workflow(input) + class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): def __init__( diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 0b797f606..7b9ec247d 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -14,8 +14,8 @@ from opentelemetry.trace import StatusCode, get_tracer from temporalio import activity, workflow -from temporalio.client import Client -from temporalio.common import RetryPolicy +from temporalio.client import Client, WithStartWorkflowOperation +from temporalio.common import RetryPolicy, WorkflowIDConflictPolicy from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.contrib.opentelemetry import workflow as otel_workflow from temporalio.exceptions import ApplicationError, ApplicationErrorCategory @@ -55,6 +55,7 @@ class TracingWorkflowAction: continue_as_new: Optional[TracingWorkflowActionContinueAsNew] = None wait_until_signal_count: int = 0 wait_and_do_update: bool = False + wait_and_do_start_with_update: bool = False @dataclass @@ -79,6 +80,7 @@ class TracingWorkflowActionContinueAsNew: ready_for_update: asyncio.Semaphore +ready_for_update_with_start: asyncio.Semaphore @workflow.defn @@ -86,6 +88,7 @@ class TracingWorkflow: def __init__(self) -> None: self._signal_count = 0 self._did_update = False + self._did_update_with_start = False @workflow.run async def run(self, param: TracingWorkflowParam) -> None: @@ -140,6 +143,9 @@ async def run(self, param: TracingWorkflowParam) -> None: if action.wait_and_do_update: ready_for_update.release() await workflow.wait_condition(lambda: self._did_update) + if action.wait_and_do_start_with_update: + ready_for_update_with_start.release() + await workflow.wait_condition(lambda: self._did_update_with_start) async def _raise_on_non_replay(self) -> None: replaying = workflow.unsafe.is_replaying() @@ -161,6 +167,10 @@ def signal(self) -> None: def update(self) -> None: self._did_update = True + @workflow.update + def update_with_start(self) -> None: + self._did_update_with_start = True + @update.validator def update_validator(self) -> None: pass @@ -174,6 +184,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): ) global ready_for_update ready_for_update = asyncio.Semaphore(0) + global ready_for_update_with_start + ready_for_update_with_start = asyncio.Semaphore(0) # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() provider = TracerProvider() @@ -195,63 +207,64 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): ): # Run workflow with various actions workflow_id = f"workflow_{uuid.uuid4()}" - handle = await client.start_workflow( - TracingWorkflow.run, - TracingWorkflowParam( - actions=[ - # First fail on replay - TracingWorkflowAction(fail_on_non_replay=True), - # Wait for a signal - TracingWorkflowAction(wait_until_signal_count=1), - # Exec activity that fails task before complete - TracingWorkflowAction( - activity=TracingWorkflowActionActivity( - param=TracingActivityParam(fail_until_attempt=2), - fail_on_non_replay_before_complete=True, - ), + workflow_params = TracingWorkflowParam( + actions=[ + # First fail on replay + TracingWorkflowAction(fail_on_non_replay=True), + # Wait for a signal + TracingWorkflowAction(wait_until_signal_count=1), + # Exec activity that fails task before complete + TracingWorkflowAction( + activity=TracingWorkflowActionActivity( + param=TracingActivityParam(fail_until_attempt=2), + fail_on_non_replay_before_complete=True, ), - # Wait for update - TracingWorkflowAction(wait_and_do_update=True), - # Exec child workflow that fails task before complete - TracingWorkflowAction( - child_workflow=TracingWorkflowActionChildWorkflow( - id=f"{workflow_id}_child", - # Exec activity and finish after two signals - param=TracingWorkflowParam( - actions=[ - TracingWorkflowAction( - activity=TracingWorkflowActionActivity( - param=TracingActivityParam(), - local=True, - ), + ), + # Wait for update + TracingWorkflowAction(wait_and_do_update=True), + # Exec child workflow that fails task before complete + TracingWorkflowAction( + child_workflow=TracingWorkflowActionChildWorkflow( + id=f"{workflow_id}_child", + # Exec activity and finish after two signals + param=TracingWorkflowParam( + actions=[ + TracingWorkflowAction( + activity=TracingWorkflowActionActivity( + param=TracingActivityParam(), + local=True, ), - # Wait for the two signals - TracingWorkflowAction(wait_until_signal_count=2), - ] - ), - signal=True, - external_signal=True, - fail_on_non_replay_before_complete=True, - ) - ), - # Continue as new and run one local activity - TracingWorkflowAction( - continue_as_new=TracingWorkflowActionContinueAsNew( - param=TracingWorkflowParam( - # Do a local activity in the continue as new - actions=[ - TracingWorkflowAction( - activity=TracingWorkflowActionActivity( - param=TracingActivityParam(), - local=True, - ), - ) - ] - ) + ), + # Wait for the two signals + TracingWorkflowAction(wait_until_signal_count=2), + ] + ), + signal=True, + external_signal=True, + fail_on_non_replay_before_complete=True, + ) + ), + # Continue as new and run one local activity + TracingWorkflowAction( + continue_as_new=TracingWorkflowActionContinueAsNew( + param=TracingWorkflowParam( + # Do a local activity in the continue as new + actions=[ + TracingWorkflowAction( + activity=TracingWorkflowActionActivity( + param=TracingActivityParam(), + local=True, + ), + ) + ] ) - ), - ], - ), + ) + ), + ], + ) + handle = await client.start_workflow( + TracingWorkflow.run, + workflow_params, id=workflow_id, task_queue=task_queue, ) @@ -301,6 +314,100 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): ] +async def test_opentelemetry_tracing_update_with_start( + client: Client, env: WorkflowEnvironment +): + # TODO(cretz): Fix + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1424" + ) + global ready_for_update + ready_for_update = asyncio.Semaphore(0) + global ready_for_update_with_start + ready_for_update_with_start = asyncio.Semaphore(0) + # Create a tracer that has an in-memory exporter + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + # Create new client with tracer interceptor + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[TracingWorkflow], + activities=[tracing_activity], + # Needed so we can wait to send update at the right time + workflow_runner=UnsandboxedWorkflowRunner(), + ): + # Run workflow with various actions + workflow_id = f"workflow_{uuid.uuid4()}" + workflow_params = TracingWorkflowParam( + actions=[ + # Wait for update + TracingWorkflowAction(wait_and_do_start_with_update=True), + ] + ) + handle = await client.start_workflow( + TracingWorkflow.run, + workflow_params, + id=workflow_id, + task_queue=task_queue, + ) + async with ready_for_update_with_start: + start_op = WithStartWorkflowOperation( + TracingWorkflow.run, + workflow_params, + id=handle.id, + task_queue=task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + await client.execute_update_with_start_workflow( + update=TracingWorkflow.update_with_start, + start_workflow_operation=start_op, + id=handle.id, + ) + await handle.result() + + # issue update with start again now that the former has completed + start_op = WithStartWorkflowOperation( + TracingWorkflow.run, + TracingWorkflowParam(actions=[]), + id="second-workflow", + task_queue=task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + await client.execute_update_with_start_workflow( + update=TracingWorkflow.update_with_start, + start_workflow_operation=start_op, + id="second-workflow", + ) + + # Dump debug with attributes, but do string assertion test without + logging.debug( + "Spans:\n%s", + "\n".join(dump_spans(exporter.get_finished_spans(), with_attributes=False)), + ) + assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [ + "StartWorkflow:TracingWorkflow", + " RunWorkflow:TracingWorkflow", + " MyCustomSpan", + " HandleUpdate:update_with_start (links: StartUpdateWithStartWorkflow:TracingWorkflow)", + " CompleteWorkflow:TracingWorkflow", + "StartUpdateWithStartWorkflow:TracingWorkflow", + "StartUpdateWithStartWorkflow:TracingWorkflow", + " HandleUpdate:update_with_start (links: StartUpdateWithStartWorkflow:TracingWorkflow)", + " RunWorkflow:TracingWorkflow", + " MyCustomSpan", + " CompleteWorkflow:TracingWorkflow", + ] + + def dump_spans( spans: Iterable[ReadableSpan], *, From 0ceab44fda40212aa574e44a82ebb9c862cfae71 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 9 Oct 2025 13:57:50 -0700 Subject: [PATCH 2/8] remove debug printing --- temporalio/contrib/opentelemetry.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index f583b1fdc..34a81a6fa 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -307,10 +307,6 @@ async def start_update_with_start_workflow( input=input, kind=opentelemetry.trace.SpanKind.CLIENT, ): - span = opentelemetry.trace.get_current_span().get_span_context() - print( - f"start_update_with_start_span {opentelemetry.trace.format_trace_id(span.trace_id)} {opentelemetry.trace.format_span_id(span.span_id)}" - ) return await super().start_update_with_start_workflow(input) From f373d7e095a328a179c8913ea8ef2ab289eea00f Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 9 Oct 2025 14:26:36 -0700 Subject: [PATCH 3/8] update comment about fan out to specify operation rather than command. Restore existing tracing test to original state --- temporalio/client.py | 2 +- tests/contrib/test_opentelemetry.py | 109 ++++++++++++++-------------- 2 files changed, 55 insertions(+), 56 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index ba1e4e2ca..84fba9dbb 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6364,7 +6364,7 @@ def on_start( err: Optional[BaseException] = None try: - # fan headers out to commands + # fan headers out to both operations input.start_workflow_input.headers = input.headers input.update_workflow_input.headers = input.headers diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 7b9ec247d..f69ba345b 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -207,64 +207,63 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): ): # Run workflow with various actions workflow_id = f"workflow_{uuid.uuid4()}" - workflow_params = TracingWorkflowParam( - actions=[ - # First fail on replay - TracingWorkflowAction(fail_on_non_replay=True), - # Wait for a signal - TracingWorkflowAction(wait_until_signal_count=1), - # Exec activity that fails task before complete - TracingWorkflowAction( - activity=TracingWorkflowActionActivity( - param=TracingActivityParam(fail_until_attempt=2), - fail_on_non_replay_before_complete=True, - ), - ), - # Wait for update - TracingWorkflowAction(wait_and_do_update=True), - # Exec child workflow that fails task before complete - TracingWorkflowAction( - child_workflow=TracingWorkflowActionChildWorkflow( - id=f"{workflow_id}_child", - # Exec activity and finish after two signals - param=TracingWorkflowParam( - actions=[ - TracingWorkflowAction( - activity=TracingWorkflowActionActivity( - param=TracingActivityParam(), - local=True, - ), - ), - # Wait for the two signals - TracingWorkflowAction(wait_until_signal_count=2), - ] + handle = await client.start_workflow( + TracingWorkflow.run, + TracingWorkflowParam( + actions=[ + # First fail on replay + TracingWorkflowAction(fail_on_non_replay=True), + # Wait for a signal + TracingWorkflowAction(wait_until_signal_count=1), + # Exec activity that fails task before complete + TracingWorkflowAction( + activity=TracingWorkflowActionActivity( + param=TracingActivityParam(fail_until_attempt=2), + fail_on_non_replay_before_complete=True, ), - signal=True, - external_signal=True, - fail_on_non_replay_before_complete=True, - ) - ), - # Continue as new and run one local activity - TracingWorkflowAction( - continue_as_new=TracingWorkflowActionContinueAsNew( - param=TracingWorkflowParam( - # Do a local activity in the continue as new - actions=[ - TracingWorkflowAction( - activity=TracingWorkflowActionActivity( - param=TracingActivityParam(), - local=True, + ), + # Wait for update + TracingWorkflowAction(wait_and_do_update=True), + # Exec child workflow that fails task before complete + TracingWorkflowAction( + child_workflow=TracingWorkflowActionChildWorkflow( + id=f"{workflow_id}_child", + # Exec activity and finish after two signals + param=TracingWorkflowParam( + actions=[ + TracingWorkflowAction( + activity=TracingWorkflowActionActivity( + param=TracingActivityParam(), + local=True, + ), ), - ) - ] + # Wait for the two signals + TracingWorkflowAction(wait_until_signal_count=2), + ] + ), + signal=True, + external_signal=True, + fail_on_non_replay_before_complete=True, ) - ) - ), - ], - ) - handle = await client.start_workflow( - TracingWorkflow.run, - workflow_params, + ), + # Continue as new and run one local activity + TracingWorkflowAction( + continue_as_new=TracingWorkflowActionContinueAsNew( + param=TracingWorkflowParam( + # Do a local activity in the continue as new + actions=[ + TracingWorkflowAction( + activity=TracingWorkflowActionActivity( + param=TracingActivityParam(), + local=True, + ), + ) + ] + ) + ) + ), + ], + ), id=workflow_id, task_queue=task_queue, ) From f9785e40899441ded6f86e7c3b4ffddabf6747db Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 9 Oct 2025 14:28:27 -0700 Subject: [PATCH 4/8] remove copy/pasted todo --- tests/contrib/test_opentelemetry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index f69ba345b..496ecdea1 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -316,7 +316,6 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): async def test_opentelemetry_tracing_update_with_start( client: Client, env: WorkflowEnvironment ): - # TODO(cretz): Fix if env.supports_time_skipping: pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1424" From 5486f468eb80673115ff512e5868475e6efbe96e Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 9 Oct 2025 14:38:51 -0700 Subject: [PATCH 5/8] Clean up test a little bit --- tests/contrib/test_opentelemetry.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 496ecdea1..6ae3686cc 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -14,7 +14,7 @@ from opentelemetry.trace import StatusCode, get_tracer from temporalio import activity, workflow -from temporalio.client import Client, WithStartWorkflowOperation +from temporalio.client import Client, WithStartWorkflowOperation, WorkflowUpdateStage from temporalio.common import RetryPolicy, WorkflowIDConflictPolicy from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.contrib.opentelemetry import workflow as otel_workflow @@ -184,8 +184,6 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): ) global ready_for_update ready_for_update = asyncio.Semaphore(0) - global ready_for_update_with_start - ready_for_update_with_start = asyncio.Semaphore(0) # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() provider = TracerProvider() @@ -320,8 +318,6 @@ async def test_opentelemetry_tracing_update_with_start( pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1424" ) - global ready_for_update - ready_for_update = asyncio.Semaphore(0) global ready_for_update_with_start ready_for_update_with_start = asyncio.Semaphore(0) # Create a tracer that has an in-memory exporter @@ -365,25 +361,27 @@ async def test_opentelemetry_tracing_update_with_start( task_queue=task_queue, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) - await client.execute_update_with_start_workflow( - update=TracingWorkflow.update_with_start, + await client.start_update_with_start_workflow( + TracingWorkflow.update_with_start, start_workflow_operation=start_op, id=handle.id, + wait_for_stage=WorkflowUpdateStage.ACCEPTED, ) await handle.result() - # issue update with start again now that the former has completed + # issue update with start again to trigger a new workflow + workflow_id = f"workflow_{uuid.uuid4()}" start_op = WithStartWorkflowOperation( TracingWorkflow.run, TracingWorkflowParam(actions=[]), - id="second-workflow", + id=workflow_id, task_queue=task_queue, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) await client.execute_update_with_start_workflow( update=TracingWorkflow.update_with_start, start_workflow_operation=start_op, - id="second-workflow", + id=workflow_id, ) # Dump debug with attributes, but do string assertion test without From 6ab9c194b5b6f13c5b0a9061347cc7e9d64fff62 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 9 Oct 2025 14:44:13 -0700 Subject: [PATCH 6/8] move header fan out outside of try block --- temporalio/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 84fba9dbb..f1a16d39e 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6363,11 +6363,11 @@ def on_start( err: Optional[BaseException] = None - try: - # fan headers out to both operations - input.start_workflow_input.headers = input.headers - input.update_workflow_input.headers = input.headers + # fan headers out to both operations + input.start_workflow_input.headers = input.headers + input.update_workflow_input.headers = input.headers + try: return await self._start_workflow_update_with_start( input.start_workflow_input, input.update_workflow_input, on_start ) From 767228b841b273e145484bba05409b1ec8579d3e Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 13 Oct 2025 09:38:28 -0700 Subject: [PATCH 7/8] Revert changes to client as they are unecessary. Inject otel headers into both operation inputs in otel interceptor --- temporalio/client.py | 6 ------ temporalio/contrib/opentelemetry.py | 7 ++++++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index f1a16d39e..6c26d41ef 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1208,7 +1208,6 @@ def on_start_error( start_workflow_input=start_workflow_operation._start_workflow_input, update_workflow_input=update_input, _on_start=on_start, - headers={}, _on_start_error=on_start_error, ) @@ -5539,7 +5538,6 @@ class StartWorkflowUpdateWithStartInput: [temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse], None ] _on_start_error: Callable[[BaseException], None] - headers: Mapping[str, temporalio.api.common.v1.Payload] @dataclass @@ -6363,10 +6361,6 @@ def on_start( err: Optional[BaseException] = None - # fan headers out to both operations - input.start_workflow_input.headers = input.headers - input.update_workflow_input.headers = input.headers - try: return await self._start_workflow_update_with_start( input.start_workflow_input, input.update_workflow_input, on_start diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 34a81a6fa..ad451cff7 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -304,9 +304,14 @@ async def start_update_with_start_workflow( with self.root._start_as_current_span( f"StartUpdateWithStartWorkflow:{input.start_workflow_input.workflow}", attributes=attrs, - input=input, + input=input.start_workflow_input, kind=opentelemetry.trace.SpanKind.CLIENT, ): + if input.update_workflow_input: + input.update_workflow_input.headers = self.root._context_to_headers( + input.update_workflow_input.headers + ) + return await super().start_update_with_start_workflow(input) From 2d41030096c4a3dc9c735f4a79837ac55dbbac77 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 13 Oct 2025 10:52:42 -0700 Subject: [PATCH 8/8] set otel header value directly in update_workflow_input to avoid the extra call to payload conversion --- temporalio/contrib/opentelemetry.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index ad451cff7..9e1542814 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -307,10 +307,12 @@ async def start_update_with_start_workflow( input=input.start_workflow_input, kind=opentelemetry.trace.SpanKind.CLIENT, ): - if input.update_workflow_input: - input.update_workflow_input.headers = self.root._context_to_headers( - input.update_workflow_input.headers - ) + otel_header = input.start_workflow_input.headers.get(self.root.header_key) + if otel_header: + input.update_workflow_input.headers = { + **input.update_workflow_input.headers, + self.root.header_key: otel_header, + } return await super().start_update_with_start_workflow(input)