Skip to content

Commit a0338c1

Browse files
committed
💥 Move tracing uuids to use a separate Random from the workflow seed (#992)
* Move tracing uuids to use a separate Random from the workflow seed * Fixing replay tests * Switch to using a seed based on run_id * Remove test failure * Add comment
1 parent e4f401d commit a0338c1

File tree

3 files changed

+52
-6
lines changed

3 files changed

+52
-6
lines changed

temporalio/contrib/openai_agents/_temporal_trace_provider.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from agents.tracing.spans import Span
1616

1717
from temporalio import workflow
18+
from temporalio.contrib.openai_agents._trace_interceptor import RunIdRandom
1819
from temporalio.workflow import ReadOnlyContextError
1920

2021

@@ -133,6 +134,13 @@ def force_flush(self) -> None:
133134
self._impl.force_flush()
134135

135136

137+
def _workflow_uuid() -> str:
138+
random = cast(
139+
RunIdRandom, getattr(workflow.instance(), "__temporal_openai_tracing_random")
140+
)
141+
return random.uuid4()
142+
143+
136144
class TemporalTraceProvider(DefaultTraceProvider):
137145
"""A trace provider that integrates with Temporal workflows."""
138146

@@ -156,7 +164,7 @@ def gen_trace_id(self) -> str:
156164
if workflow.in_workflow():
157165
try:
158166
"""Generate a new trace ID."""
159-
return f"trace_{workflow.uuid4().hex}"
167+
return f"trace_{_workflow_uuid()}"
160168
except ReadOnlyContextError:
161169
return f"trace_{uuid.uuid4().hex}"
162170
return super().gen_trace_id()
@@ -166,7 +174,7 @@ def gen_span_id(self) -> str:
166174
if workflow.in_workflow():
167175
try:
168176
"""Generate a deterministic span ID."""
169-
return f"span_{workflow.uuid4().hex[:24]}"
177+
return f"span_{_workflow_uuid()}"
170178
except ReadOnlyContextError:
171179
return f"span_{uuid.uuid4().hex[:24]}"
172180
return super().gen_span_id()
@@ -176,7 +184,7 @@ def gen_group_id(self) -> str:
176184
if workflow.in_workflow():
177185
try:
178186
"""Generate a deterministic group ID."""
179-
return f"group_{workflow.uuid4().hex[:24]}"
187+
return f"group_{_workflow_uuid()}"
180188
except ReadOnlyContextError:
181189
return f"group_{uuid.uuid4().hex[:24]}"
182190
return super().gen_group_id()

temporalio/contrib/openai_agents/_trace_interceptor.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22

33
from __future__ import annotations
44

5+
import random
6+
import uuid
57
from contextlib import contextmanager
6-
from typing import Any, Mapping, Protocol, Type, cast
8+
from typing import Any, Mapping, Protocol, Type
79

810
from agents import CustomSpanData, custom_span, get_current_span, trace
911
from agents.tracing import (
1012
get_trace_provider,
1113
)
12-
from agents.tracing.provider import DefaultTraceProvider
1314
from agents.tracing.scope import Scope
14-
from agents.tracing.spans import NoOpSpan, SpanImpl
15+
from agents.tracing.spans import NoOpSpan
1516

1617
import temporalio.activity
1718
import temporalio.api.common.v1
@@ -283,6 +284,35 @@ async def execute_activity(
283284
return await self.next.execute_activity(input)
284285

285286

287+
class RunIdRandom:
288+
"""Random uuid generator seeded by the run id of the workflow.
289+
Doesn't currently support replay over reset correctly.
290+
"""
291+
292+
def __init__(self):
293+
"""Create a new random UUID generator."""
294+
self._random = random.Random("OpenAIPlugin" + workflow.info().run_id)
295+
296+
def uuid4(self) -> str:
297+
"""Generate a random UUID."""
298+
return uuid.UUID(
299+
bytes=random.getrandbits(16 * 8).to_bytes(16, "big"), version=4
300+
).hex[:24]
301+
302+
303+
def _ensure_tracing_random() -> None:
304+
"""We use a custom uuid generator for spans to ensure that changes to user code workflow.random usage
305+
do not affect tracing and vice versa.
306+
"""
307+
instance = workflow.instance()
308+
if not hasattr(instance, "__temporal_openai_tracing_random"):
309+
setattr(
310+
workflow.instance(),
311+
"__temporal_openai_tracing_random",
312+
RunIdRandom(),
313+
)
314+
315+
286316
class _ContextPropagationWorkflowInboundInterceptor(
287317
temporalio.worker.WorkflowInboundInterceptor
288318
):
@@ -292,18 +322,21 @@ def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None:
292322
async def execute_workflow(
293323
self, input: temporalio.worker.ExecuteWorkflowInput
294324
) -> Any:
325+
_ensure_tracing_random()
295326
with context_from_header(
296327
"temporal:executeWorkflow", input, temporalio.workflow.payload_converter()
297328
):
298329
return await self.next.execute_workflow(input)
299330

300331
async def handle_signal(self, input: temporalio.worker.HandleSignalInput) -> None:
332+
_ensure_tracing_random()
301333
with context_from_header(
302334
"temporal:handleSignal", input, temporalio.workflow.payload_converter()
303335
):
304336
return await self.next.handle_signal(input)
305337

306338
async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
339+
_ensure_tracing_random()
307340
with context_from_header(
308341
"temporal:handleQuery", input, temporalio.workflow.payload_converter()
309342
):
@@ -322,6 +355,7 @@ def handle_update_validator(
322355
async def handle_update_handler(
323356
self, input: temporalio.worker.HandleUpdateInput
324357
) -> Any:
358+
_ensure_tracing_random()
325359
with context_from_header(
326360
"temporal:handleUpdateHandler",
327361
input,

tests/contrib/openai_agents/test_openai_replay.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
from temporalio.contrib.openai_agents._temporal_openai_agents import (
88
set_open_ai_agent_temporal_overrides,
99
)
10+
from temporalio.contrib.openai_agents._trace_interceptor import (
11+
OpenAIAgentsTracingInterceptor,
12+
)
1013
from temporalio.contrib.pydantic import pydantic_data_converter
1114
from temporalio.worker import Replayer
1215
from tests.contrib.openai_agents.test_openai import (
@@ -48,4 +51,5 @@ async def test_replay(file_name: str) -> None:
4851
OutputGuardrailWorkflow,
4952
],
5053
data_converter=pydantic_data_converter,
54+
interceptors=[OpenAIAgentsTracingInterceptor()],
5155
).replay_workflow(WorkflowHistory.from_json("fake", history_json))

0 commit comments

Comments
 (0)