Skip to content

Commit 6670ee5

Browse files
committed
Ensure otel test will wait until after task fails to send update
1 parent 020f663 commit 6670ee5

File tree

1 file changed

+25
-10
lines changed

1 file changed

+25
-10
lines changed

tests/contrib/test_opentelemetry.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from temporalio.contrib.opentelemetry import TracingInterceptor
2020
from temporalio.contrib.opentelemetry import workflow as otel_workflow
2121
from temporalio.testing import WorkflowEnvironment
22-
from temporalio.worker import Worker
22+
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
2323

2424

2525
@dataclass
@@ -48,6 +48,7 @@ class TracingWorkflowAction:
4848
activity: Optional[TracingWorkflowActionActivity] = None
4949
continue_as_new: Optional[TracingWorkflowActionContinueAsNew] = None
5050
wait_until_signal_count: int = 0
51+
wait_and_do_update: bool = False
5152

5253

5354
@dataclass
@@ -71,10 +72,14 @@ class TracingWorkflowActionContinueAsNew:
7172
param: TracingWorkflowParam
7273

7374

75+
ready_for_update: asyncio.Semaphore
76+
77+
7478
@workflow.defn
7579
class TracingWorkflow:
7680
def __init__(self) -> None:
7781
self._signal_count = 0
82+
self._did_update = False
7883

7984
@workflow.run
8085
async def run(self, param: TracingWorkflowParam) -> None:
@@ -126,6 +131,9 @@ async def run(self, param: TracingWorkflowParam) -> None:
126131
await workflow.wait_condition(
127132
lambda: self._signal_count >= action.wait_until_signal_count
128133
)
134+
if action.wait_and_do_update:
135+
ready_for_update.release()
136+
await workflow.wait_condition(lambda: self._did_update)
129137

130138
async def _raise_on_non_replay(self) -> None:
131139
replaying = workflow.unsafe.is_replaying()
@@ -144,13 +152,11 @@ def signal(self) -> None:
144152
self._signal_count += 1
145153

146154
@workflow.update
147-
def update(self) -> int:
148-
self._signal_count += 1
149-
return self._signal_count
155+
def update(self) -> None:
156+
self._did_update = True
150157

151158
@update.validator
152159
def update_validator(self) -> None:
153-
print("Actually in validator")
154160
pass
155161

156162

@@ -160,6 +166,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
160166
pytest.skip(
161167
"Java test server: https://github.com/temporalio/sdk-java/issues/1424"
162168
)
169+
global ready_for_update
170+
ready_for_update = asyncio.Semaphore(0)
163171
# Create a tracer that has an in-memory exporter
164172
exporter = InMemorySpanExporter()
165173
provider = TracerProvider()
@@ -176,6 +184,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
176184
task_queue=task_queue,
177185
workflows=[TracingWorkflow],
178186
activities=[tracing_activity],
187+
# Needed so we can wait to send update at the right time
188+
workflow_runner=UnsandboxedWorkflowRunner(),
179189
):
180190
# Run workflow with various actions
181191
workflow_id = f"workflow_{uuid.uuid4()}"
@@ -185,15 +195,17 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
185195
actions=[
186196
# First fail on replay
187197
TracingWorkflowAction(fail_on_non_replay=True),
188-
# Wait for a signal & update
189-
TracingWorkflowAction(wait_until_signal_count=2),
198+
# Wait for a signal
199+
TracingWorkflowAction(wait_until_signal_count=1),
190200
# Exec activity that fails task before complete
191201
TracingWorkflowAction(
192202
activity=TracingWorkflowActionActivity(
193203
param=TracingActivityParam(fail_until_attempt=2),
194204
fail_on_non_replay_before_complete=True,
195205
),
196206
),
207+
# Wait for update
208+
TracingWorkflowAction(wait_and_do_update=True),
197209
# Exec child workflow that fails task before complete
198210
TracingWorkflowAction(
199211
child_workflow=TracingWorkflowActionChildWorkflow(
@@ -240,7 +252,10 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
240252
# Send query, then signal to move it along
241253
assert "some query" == await handle.query(TracingWorkflow.query)
242254
await handle.signal(TracingWorkflow.signal)
243-
await handle.execute_update(TracingWorkflow.update)
255+
# Wait to send the update until after the things that fail tasks are over, as failing a task while the update
256+
# is running can mean we execute it twice, which will mess up our spans.
257+
async with ready_for_update:
258+
await handle.execute_update(TracingWorkflow.update)
244259
await handle.result()
245260

246261
# Dump debug with attributes, but do string assertion test without
@@ -253,11 +268,11 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
253268
" RunWorkflow:TracingWorkflow",
254269
" MyCustomSpan",
255270
" HandleSignal:signal (links: SignalWorkflow:signal)",
256-
" ValidateUpdate:update (links: StartWorkflowUpdate:update)",
257-
" HandleUpdate:update (links: StartWorkflowUpdate:update)",
258271
" StartActivity:tracing_activity",
259272
" RunActivity:tracing_activity",
260273
" RunActivity:tracing_activity",
274+
" ValidateUpdate:update (links: StartWorkflowUpdate:update)",
275+
" HandleUpdate:update (links: StartWorkflowUpdate:update)",
261276
" StartChildWorkflow:TracingWorkflow",
262277
" RunWorkflow:TracingWorkflow",
263278
" MyCustomSpan",

0 commit comments

Comments
 (0)