Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 48 additions & 25 deletions temporalio/contrib/openai_agents/_trace_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import random
import uuid
from contextlib import contextmanager
from typing import Any, Mapping, Protocol, Type
from typing import Any, Mapping, Optional, Protocol, Type

from agents import CustomSpanData, custom_span, get_current_span, trace
from agents.tracing import (
get_trace_provider,
)
from agents.tracing.scope import Scope
from agents.tracing.spans import NoOpSpan
from agents.tracing.spans import NoOpSpan, Span

import temporalio.activity
import temporalio.api.common.v1
Expand Down Expand Up @@ -370,55 +370,78 @@ class _ContextPropagationWorkflowOutboundInterceptor(
async def signal_child_workflow(
self, input: temporalio.worker.SignalChildWorkflowInput
) -> None:
with custom_span(
name="temporal:signalChildWorkflow",
data={"workflowId": input.child_workflow_id},
):
trace = get_trace_provider().get_current_trace()
if trace:
with custom_span(
name="temporal:signalChildWorkflow",
data={"workflowId": input.child_workflow_id},
):
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_child_workflow(input)
else:
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_child_workflow(input)

async def signal_external_workflow(
self, input: temporalio.worker.SignalExternalWorkflowInput
) -> None:
with custom_span(
name="temporal:signalExternalWorkflow",
data={"workflowId": input.workflow_id},
):
trace = get_trace_provider().get_current_trace()
if trace:
with custom_span(
name="temporal:signalExternalWorkflow",
data={"workflowId": input.workflow_id},
):
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_external_workflow(input)
else:
set_header_from_context(input, temporalio.workflow.payload_converter())
await self.next.signal_external_workflow(input)

def start_activity(
self, input: temporalio.worker.StartActivityInput
) -> temporalio.workflow.ActivityHandle:
span = custom_span(
name="temporal:startActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)

set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_activity(input)
handle.add_done_callback(lambda _: span.finish())
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle

async def start_child_workflow(
self, input: temporalio.worker.StartChildWorkflowInput
) -> temporalio.workflow.ChildWorkflowHandle:
span = custom_span(
name="temporal:startChildWorkflow", data={"workflow": input.workflow}
)
span.start(mark_as_current=True)
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startChildWorkflow", data={"workflow": input.workflow}
)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = await self.next.start_child_workflow(input)
handle.add_done_callback(lambda _: span.finish())
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle

def start_local_activity(
self, input: temporalio.worker.StartLocalActivityInput
) -> temporalio.workflow.ActivityHandle:
span = custom_span(
name="temporal:startLocalActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startLocalActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_local_activity(input)
handle.add_done_callback(lambda _: span.finish())
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
Loading