From 2588d3fe812c33e88408757c1004bebdb3df8ddb Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 19 Aug 2025 16:43:42 -0700 Subject: [PATCH 1/2] Don't add startActivity, etc spans when there is no parent trace in which to nest them --- .../openai_agents/_trace_interceptor.py | 73 ++++++++++++------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/temporalio/contrib/openai_agents/_trace_interceptor.py b/temporalio/contrib/openai_agents/_trace_interceptor.py index b004d861f..2f12adf22 100644 --- a/temporalio/contrib/openai_agents/_trace_interceptor.py +++ b/temporalio/contrib/openai_agents/_trace_interceptor.py @@ -5,14 +5,14 @@ import random import uuid from contextlib import contextmanager -from typing import Any, Mapping, Protocol, Type +from typing import Any, Mapping, Optional, Protocol, Type from agents import CustomSpanData, custom_span, get_current_span, trace from agents.tracing import ( get_trace_provider, ) from agents.tracing.scope import Scope -from agents.tracing.spans import NoOpSpan +from agents.tracing.spans import NoOpSpan, Span import temporalio.activity import temporalio.api.common.v1 @@ -370,55 +370,78 @@ class _ContextPropagationWorkflowOutboundInterceptor( async def signal_child_workflow( self, input: temporalio.worker.SignalChildWorkflowInput ) -> None: - with custom_span( - name="temporal:signalChildWorkflow", - data={"workflowId": input.child_workflow_id}, - ): + trace = get_trace_provider().get_current_trace() + if trace: + with custom_span( + name="temporal:signalChildWorkflow", + data={"workflowId": input.child_workflow_id}, + ): + set_header_from_context(input, temporalio.workflow.payload_converter()) + await self.next.signal_child_workflow(input) + else: set_header_from_context(input, temporalio.workflow.payload_converter()) await self.next.signal_child_workflow(input) async def signal_external_workflow( self, input: temporalio.worker.SignalExternalWorkflowInput ) -> None: - with custom_span( - name="temporal:signalExternalWorkflow", - data={"workflowId": input.workflow_id}, - ): + trace = get_trace_provider().get_current_trace() + if trace: + with custom_span( + name="temporal:signalExternalWorkflow", + data={"workflowId": input.workflow_id}, + ): + set_header_from_context(input, temporalio.workflow.payload_converter()) + await self.next.signal_external_workflow(input) + else: set_header_from_context(input, temporalio.workflow.payload_converter()) await self.next.signal_external_workflow(input) def start_activity( self, input: temporalio.worker.StartActivityInput ) -> temporalio.workflow.ActivityHandle: - span = custom_span( - name="temporal:startActivity", data={"activity": input.activity} - ) - span.start(mark_as_current=True) + trace = get_trace_provider().get_current_trace() + span: Optional[Span] = None + if trace: + span = custom_span( + name="temporal:startActivity", data={"activity": input.activity} + ) + span.start(mark_as_current=True) + set_header_from_context(input, temporalio.workflow.payload_converter()) handle = self.next.start_activity(input) - handle.add_done_callback(lambda _: span.finish()) + if span: + handle.add_done_callback(lambda _: span.finish() if span else None) return handle async def start_child_workflow( self, input: temporalio.worker.StartChildWorkflowInput ) -> temporalio.workflow.ChildWorkflowHandle: - span = custom_span( - name="temporal:startChildWorkflow", data={"workflow": input.workflow} - ) - span.start(mark_as_current=True) + trace = get_trace_provider().get_current_trace() + span: Optional[Span] = None + if trace: + span = custom_span( + name="temporal:startChildWorkflow", data={"workflow": input.workflow} + ) + span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) handle = await self.next.start_child_workflow(input) - handle.add_done_callback(lambda _: span.finish()) + if span: + handle.add_done_callback(lambda _: span.finish() if span else None) return handle def start_local_activity( self, input: temporalio.worker.StartLocalActivityInput ) -> temporalio.workflow.ActivityHandle: - span = custom_span( - name="temporal:startLocalActivity", data={"activity": input.activity} - ) - span.start(mark_as_current=True) + trace = get_trace_provider().get_current_trace() + span: Optional[Span] = None + if trace: + span = custom_span( + name="temporal:startLocalActivity", data={"activity": input.activity} + ) + span.start(mark_as_current=True) set_header_from_context(input, temporalio.workflow.payload_converter()) handle = self.next.start_local_activity(input) - handle.add_done_callback(lambda _: span.finish()) + if span: + handle.add_done_callback(lambda _: span.finish() if span else None) return handle From fdc987b4986bc62bcad7a7c332986616e6d0c331 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 20 Aug 2025 10:02:38 -0700 Subject: [PATCH 2/2] Type ignore instead of recheck --- temporalio/contrib/openai_agents/_trace_interceptor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/temporalio/contrib/openai_agents/_trace_interceptor.py b/temporalio/contrib/openai_agents/_trace_interceptor.py index 2f12adf22..20d489b65 100644 --- a/temporalio/contrib/openai_agents/_trace_interceptor.py +++ b/temporalio/contrib/openai_agents/_trace_interceptor.py @@ -411,7 +411,7 @@ def start_activity( set_header_from_context(input, temporalio.workflow.payload_converter()) handle = self.next.start_activity(input) if span: - handle.add_done_callback(lambda _: span.finish() if span else None) + handle.add_done_callback(lambda _: span.finish()) # type: ignore return handle async def start_child_workflow( @@ -427,7 +427,7 @@ async def start_child_workflow( set_header_from_context(input, temporalio.workflow.payload_converter()) handle = await self.next.start_child_workflow(input) if span: - handle.add_done_callback(lambda _: span.finish() if span else None) + handle.add_done_callback(lambda _: span.finish()) # type: ignore return handle def start_local_activity( @@ -443,5 +443,5 @@ def start_local_activity( set_header_from_context(input, temporalio.workflow.payload_converter()) handle = self.next.start_local_activity(input) if span: - handle.add_done_callback(lambda _: span.finish() if span else None) + handle.add_done_callback(lambda _: span.finish()) # type: ignore return handle