From e08802e9b331d80a7fbd29bbae38f4bd373b5cf5 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Thu, 19 Jun 2025 12:31:46 +0100 Subject: [PATCH 1/6] fix(opentelemetry): trace propagation in process pool activities - Add test to show trace context is not available --- .vscode/settings.json | 7 +++ tests/contrib/test_opentelemetry.py | 75 ++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..9b388533a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index e42e6b977..a3617c9ae 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -2,17 +2,23 @@ import asyncio import logging +import multiprocessing +import trace +import typing import uuid from dataclasses import dataclass from datetime import timedelta from typing import Iterable, List, Optional +import concurrent.futures from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.trace import get_tracer +from opentelemetry.trace import get_tracer, get_current_span +from opentelemetry.context import get_current from temporalio import activity, workflow +from temporalio.worker import SharedStateManager from temporalio.client import Client from temporalio.common import RetryPolicy from temporalio.contrib.opentelemetry import TracingInterceptor @@ -392,3 +398,70 @@ async def test_opentelemetry_always_create_workflow_spans(client: Client): # * workflow failure and wft failure # * signal with start # * signal failure and wft failure from signal + + +@workflow.defn +class ActivityTracePropagationWorkflow: + @workflow.run + async def run(self) -> str: + retry_policy = RetryPolicy(initial_interval=timedelta(milliseconds=1)) + return await workflow.execute_activity( + sync_activity, + {}, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=retry_policy, + ) + + +@activity.defn +def sync_activity(param: typing.Any) -> str: + current_span = get_current_span() + is_recording = current_span.is_recording() + logging.debug("[sync_activity] Current span is recording: %s", is_recording) + # trace_context = get_current() + # logging.debug("Trace context:\n%s", "\n".join(trace_context)) + inner_tracer = get_tracer("sync_activity") + with inner_tracer.start_as_current_span( + "child_span", + ): + return "done" + + +async def test_activity_trace_propagation( + client: Client, + env: WorkflowEnvironment, +): + # TODO: test all kinds of workers (just to check we haven't broken others) + # TODO: add spy interceptor to check `input.fn` wraps original metadata + + # Create a tracer that has an in-memory exporter + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + # Create a worker with an process pool activity executor + async with Worker( + client, + task_queue=f"task_queue_{uuid.uuid4()}", + workflows=[ActivityTracePropagationWorkflow], + activities=[sync_activity], + interceptors=[TracingInterceptor(tracer)], + activity_executor=concurrent.futures.ProcessPoolExecutor(max_workers=1), + shared_state_manager=SharedStateManager.create_from_multiprocessing( + multiprocessing.Manager() + ), + ) as worker: + assert "done" == await client.execute_workflow( + ActivityTracePropagationWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Expect the child span to be there + spans = exporter.get_finished_spans() + logging.debug("Spans:\n%s", "\n".join(dump_spans(spans, with_attributes=False))) + assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [ + "RunActivity:sync_activity", + " child_span", + ] From 98ed4920dad59b5fe763e9348ba94e78845d609f Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Sun, 20 Jul 2025 16:42:11 +0100 Subject: [PATCH 2/6] wip: add impl and try to make test pass --- temporalio/contrib/opentelemetry.py | 41 +++++++++++++++++++++++++++++ tests/contrib/test_opentelemetry.py | 9 +++++-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 84773fd43..7e246ca84 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -4,6 +4,9 @@ from contextlib import contextmanager from dataclasses import dataclass +import dataclasses +import functools +import inspect from typing import ( Any, Callable, @@ -17,6 +20,7 @@ cast, ) +import concurrent.futures import opentelemetry.baggage.propagation import opentelemetry.context import opentelemetry.context.context @@ -293,9 +297,46 @@ async def execute_activity( }, kind=opentelemetry.trace.SpanKind.SERVER, ): + # Propagate trace_context into synchronous activities running in + # ProcessPoolExecutor + is_async = inspect.iscoroutinefunction( + input.fn + ) or inspect.iscoroutinefunction(input.fn.__call__) + is_threadpool_executor = isinstance( + input.executor, concurrent.futures.ThreadPoolExecutor + ) + if not (is_async or is_threadpool_executor): + carrier = {} + default_text_map_propagator.inject(carrier) + input.fn = ActivityFnWithTraceContext(input.fn, carrier) + return await super().execute_activity(input) +# Note: the activity function must be picklable to pass to the ProcessPoolExecutor. +# We wrap the original function to propagate the trace context as transparently +# as possible (otherwise _ActivityInboundImpl would need to be modified to pass +# trace context as an extra arg). +@dataclasses.dataclass +class ActivityFnWithTraceContext: + fn: Callable[..., Any] + context: _CarrierDict + + def __post_init__(self): + # Preserve the original function's metadata to support reflection. + # Downstream interceptors may inspect these attributes (__module__, __name__, etc.) + # e.g. SentryInterceptor in the Python Samples + functools.wraps(self.fn)(self) + + def __call__(self, *args: Any, **kwargs: Any): + trace_context = default_text_map_propagator.extract(self.context) + token = opentelemetry.context.attach(trace_context) + try: + return self.fn(*args, **kwargs) + finally: + opentelemetry.context.detach(token) + + class _InputWithHeaders(Protocol): headers: Mapping[str, temporalio.api.common.v1.Payload] diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index a3617c9ae..fd2245077 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -11,11 +11,11 @@ from typing import Iterable, List, Optional import concurrent.futures +import opentelemetry.trace from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.trace import get_tracer, get_current_span -from opentelemetry.context import get_current from temporalio import activity, workflow from temporalio.worker import SharedStateManager @@ -440,6 +440,9 @@ async def test_activity_trace_propagation( provider.add_span_processor(SimpleSpanProcessor(exporter)) tracer = get_tracer(__name__, tracer_provider=provider) + # def initializer() -> None: + opentelemetry.trace.set_tracer_provider(provider) + # Create a worker with an process pool activity executor async with Worker( client, @@ -447,7 +450,9 @@ async def test_activity_trace_propagation( workflows=[ActivityTracePropagationWorkflow], activities=[sync_activity], interceptors=[TracingInterceptor(tracer)], - activity_executor=concurrent.futures.ProcessPoolExecutor(max_workers=1), + activity_executor=concurrent.futures.ProcessPoolExecutor( + max_workers=1 # , initializer=initializer + ), shared_state_manager=SharedStateManager.create_from_multiprocessing( multiprocessing.Manager() ), From 3d2596ef3309dccb92758f58f1b75b79828a0138 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Sun, 3 Aug 2025 19:17:59 +0100 Subject: [PATCH 3/6] wip: working test case - This test implementation isn't to be taken as a reference for production. The fixed `TracingInterceptor` works in production, provided you use the `OTLPSpanExporter` or other exporter that pushes traces to a collector or backend, rather than one that pulls traces from the server (if one exists). - Add a custom span exporter to write finished_spans to a list proxy created by the server process manager. This is because we want to test the full trace across the process pool. Again, in production, the child process can just export spans directly to a remote collector. Tracing is designed to handle distributed systems. - Ensure the child process is initialised with its own TracerProvider to avoid different default mp_context behaviours across MacOS and Linux --- temporalio/contrib/opentelemetry.py | 39 ++++--- tests/contrib/test_opentelemetry.py | 167 +++++++++++++++++++++++----- 2 files changed, 164 insertions(+), 42 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 7e246ca84..b2492f2b9 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -2,11 +2,12 @@ from __future__ import annotations -from contextlib import contextmanager -from dataclasses import dataclass +import concurrent.futures import dataclasses import functools import inspect +from contextlib import contextmanager +from dataclasses import dataclass from typing import ( Any, Callable, @@ -20,7 +21,6 @@ cast, ) -import concurrent.futures import opentelemetry.baggage.propagation import opentelemetry.context import opentelemetry.context.context @@ -301,35 +301,44 @@ async def execute_activity( # ProcessPoolExecutor is_async = inspect.iscoroutinefunction( input.fn - ) or inspect.iscoroutinefunction(input.fn.__call__) + ) or inspect.iscoroutinefunction( + input.fn.__call__ # type: ignore + ) is_threadpool_executor = isinstance( input.executor, concurrent.futures.ThreadPoolExecutor ) if not (is_async or is_threadpool_executor): - carrier = {} + carrier: _CarrierDict = {} default_text_map_propagator.inject(carrier) input.fn = ActivityFnWithTraceContext(input.fn, carrier) return await super().execute_activity(input) -# Note: the activity function must be picklable to pass to the ProcessPoolExecutor. -# We wrap the original function to propagate the trace context as transparently -# as possible (otherwise _ActivityInboundImpl would need to be modified to pass -# trace context as an extra arg). @dataclasses.dataclass class ActivityFnWithTraceContext: + """Wraps an activity function to inject trace context from a carrier. + + This wrapper is intended for sync activities executed in a process pool executor + to ensure tracing features like child spans, trace events, and log-correlation + works properly in the user's activity implementation. + """ + fn: Callable[..., Any] - context: _CarrierDict + carrier: _CarrierDict def __post_init__(self): - # Preserve the original function's metadata to support reflection. - # Downstream interceptors may inspect these attributes (__module__, __name__, etc.) - # e.g. SentryInterceptor in the Python Samples + """Post-initialization to ensure the function is wrapped correctly. + + Ensures the original function's metadata is preserved for reflection. + Downstream interceptors that may inspect the function's attributes, + like `__module__`, `__name__`, etc. (e.g. the `SentryInterceptor` + in the Python Samples.) + """ functools.wraps(self.fn)(self) - def __call__(self, *args: Any, **kwargs: Any): - trace_context = default_text_map_propagator.extract(self.context) + def __call__(self, *args: Any, **kwargs: Any): # noqa: D102 + trace_context = default_text_map_propagator.extract(self.carrier) token = opentelemetry.context.attach(trace_context) try: return self.fn(*args, **kwargs) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index fd2245077..bb3d21acd 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -1,30 +1,34 @@ from __future__ import annotations import asyncio +import concurrent.futures import logging import multiprocessing -import trace +import multiprocessing.managers +import threading import typing import uuid from dataclasses import dataclass from datetime import timedelta -from typing import Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Sequence, Union -import concurrent.futures import opentelemetry.trace from opentelemetry.sdk.trace import ReadableSpan, TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.trace import get_tracer, get_current_span +from opentelemetry.trace import get_current_span, get_tracer from temporalio import activity, workflow -from temporalio.worker import SharedStateManager from temporalio.client import Client from temporalio.common import RetryPolicy from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.contrib.opentelemetry import workflow as otel_workflow from temporalio.testing import WorkflowEnvironment -from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker # Passing through because Python 3.9 has an import bug at # https://github.com/python/cpython/issues/91351 @@ -85,6 +89,58 @@ class TracingWorkflowActionContinueAsNew: ready_for_update: asyncio.Semaphore +@dataclass(frozen=True) +class SerialisableSpan: + @dataclass(frozen=True) + class SpanContext: + trace_id: int + span_id: int + + @classmethod + def from_span_context( + cls, context: opentelemetry.trace.SpanContext + ) -> "SerialisableSpan.SpanContext": + return cls( + trace_id=context.trace_id, + span_id=context.span_id, + ) + + @classmethod + def from_optional_span_context( + cls, context: Optional[opentelemetry.trace.SpanContext] + ) -> Optional["SerialisableSpan.SpanContext"]: + if context is None: + return None + return cls.from_span_context(context) + + @dataclass(frozen=True) + class Link: + context: SerialisableSpan.SpanContext + attributes: Dict[str, Any] + + name: str + context: Optional[SpanContext] + parent: Optional[SpanContext] + attributes: Dict[str, Any] + links: Sequence[Link] + + @classmethod + def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan": + return cls( + name=span.name, + context=cls.SpanContext.from_optional_span_context(span.context), + parent=cls.SpanContext.from_optional_span_context(span.parent), + attributes=dict(span.attributes or {}), + links=tuple( + cls.Link( + context=cls.SpanContext.from_span_context(link.context), + attributes=dict(span.attributes or {}), + ) + for link in span.links + ), + ) + + @workflow.defn class TracingWorkflow: def __init__(self) -> None: @@ -306,7 +362,7 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): def dump_spans( - spans: Iterable[ReadableSpan], + spans: Iterable[Union[ReadableSpan, SerialisableSpan]], *, parent_id: Optional[int] = None, with_attributes: bool = True, @@ -325,7 +381,10 @@ def dump_spans( span_links: List[str] = [] for link in span.links: for link_span in spans: - if link_span.context.span_id == link.context.span_id: + if ( + link_span.context + and link_span.context.span_id == link.context.span_id + ): span_links.append(link_span.name) span_str += f" (links: {', '.join(span_links)})" # Signals can duplicate in rare situations, so we make sure not to @@ -335,7 +394,7 @@ def dump_spans( ret.append(span_str) ret += dump_spans( spans, - parent_id=span.context.span_id, + parent_id=(span.context.span_id if span.context else None), with_attributes=with_attributes, indent_depth=indent_depth + 1, ) @@ -408,18 +467,19 @@ async def run(self) -> str: return await workflow.execute_activity( sync_activity, {}, - start_to_close_timeout=timedelta(seconds=10), + # TODO: Reduce to 10s - increasing to make debugging easier + start_to_close_timeout=timedelta(minutes=10), retry_policy=retry_policy, ) @activity.defn def sync_activity(param: typing.Any) -> str: - current_span = get_current_span() - is_recording = current_span.is_recording() - logging.debug("[sync_activity] Current span is recording: %s", is_recording) - # trace_context = get_current() - # logging.debug("Trace context:\n%s", "\n".join(trace_context)) + """An activity that uses tracing features. + + When executed in a process pool, we expect the trace context to be available + from the parent process. + """ inner_tracer = get_tracer("sync_activity") with inner_tracer.start_as_current_span( "child_span", @@ -431,8 +491,8 @@ async def test_activity_trace_propagation( client: Client, env: WorkflowEnvironment, ): - # TODO: test all kinds of workers (just to check we haven't broken others) # TODO: add spy interceptor to check `input.fn` wraps original metadata + # TODO: Add Resource to show how resource would be propagated # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() @@ -440,10 +500,14 @@ async def test_activity_trace_propagation( provider.add_span_processor(SimpleSpanProcessor(exporter)) tracer = get_tracer(__name__, tracer_provider=provider) - # def initializer() -> None: - opentelemetry.trace.set_tracer_provider(provider) + # Create a proxy list using the server process manager which we'll use + # to access finished spans in the process pool + manager = multiprocessing.Manager() + finished_spans_proxy = typing.cast( + multiprocessing.managers.ListProxy[SerialisableSpan], manager.list() + ) - # Create a worker with an process pool activity executor + # Create a worker with a process pool activity executor async with Worker( client, task_queue=f"task_queue_{uuid.uuid4()}", @@ -451,11 +515,11 @@ async def test_activity_trace_propagation( activities=[sync_activity], interceptors=[TracingInterceptor(tracer)], activity_executor=concurrent.futures.ProcessPoolExecutor( - max_workers=1 # , initializer=initializer - ), - shared_state_manager=SharedStateManager.create_from_multiprocessing( - multiprocessing.Manager() + max_workers=1, + initializer=activity_trace_propagation_initializer, + initargs=(finished_spans_proxy,), ), + shared_state_manager=SharedStateManager.create_from_multiprocessing(manager), ) as worker: assert "done" == await client.execute_workflow( ActivityTracePropagationWorkflow.run, @@ -463,10 +527,59 @@ async def test_activity_trace_propagation( task_queue=worker.task_queue, ) - # Expect the child span to be there - spans = exporter.get_finished_spans() + spans = exporter.get_finished_spans() + tuple(finished_spans_proxy) logging.debug("Spans:\n%s", "\n".join(dump_spans(spans, with_attributes=False))) - assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [ + assert dump_spans(spans, with_attributes=False) == [ "RunActivity:sync_activity", " child_span", ] + + +class _ListProxySpanExporter(SpanExporter): + """Implementation of :class:`SpanExporter` that exports spans to a + list proxy created by a multiprocessing manager. + + This class is used for testing multiprocessing setups, as we can get access + to the finished spans from the parent process. + + In production, you would use `OTLPSpanExporter` or similar to export spans. + Tracing is designed to be distributed, the child process can push collected + spans directly to a collector or backend, which can reassemble the spans + into a single trace. + """ + + def __init__( + self, finished_spans: multiprocessing.managers.ListProxy[SerialisableSpan] + ) -> None: + self._finished_spans = finished_spans + self._stopped = False + self._lock = threading.Lock() + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + if self._stopped: + return SpanExportResult.FAILURE + with self._lock: + # Note: ReadableSpan is not picklable, so convert to a DTO + # Note: we could use `span.to_json()` but there isn't a `from_json` + # and the serialisation isn't easily reversible, e.g. `parent` context + # is lost, span/trace IDs are transformed into strings + self._finished_spans.extend( + [SerialisableSpan.from_readable_span(span) for span in spans] + ) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + self._stopped = True + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +def activity_trace_propagation_initializer( + _finished_spans_proxy: multiprocessing.managers.ListProxy[SerialisableSpan], +) -> None: + """Initializer for the process pool worker to export spans to a shared list.""" + _exporter = _ListProxySpanExporter(_finished_spans_proxy) + _provider = TracerProvider() + _provider.add_span_processor(SimpleSpanProcessor(_exporter)) + opentelemetry.trace.set_tracer_provider(_provider) From 6d703c0d415b0152cfbcd6b6d04b387ba55e6437 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Sun, 3 Aug 2025 19:45:52 +0100 Subject: [PATCH 4/6] wip: refactor test helpers into separate modules --- .../contrib/opentelemetry/helpers/_init__.py | 0 .../contrib/opentelemetry/helpers/tracing.py | 175 ++++++++++++++++++ tests/contrib/test_opentelemetry.py | 154 ++------------- 3 files changed, 187 insertions(+), 142 deletions(-) create mode 100644 tests/contrib/opentelemetry/helpers/_init__.py create mode 100644 tests/contrib/opentelemetry/helpers/tracing.py diff --git a/tests/contrib/opentelemetry/helpers/_init__.py b/tests/contrib/opentelemetry/helpers/_init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/opentelemetry/helpers/tracing.py b/tests/contrib/opentelemetry/helpers/tracing.py new file mode 100644 index 000000000..c952d3af3 --- /dev/null +++ b/tests/contrib/opentelemetry/helpers/tracing.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import asyncio +import concurrent.futures +import logging +import multiprocessing +import multiprocessing.managers +import threading +import typing +import uuid +from dataclasses import dataclass +from datetime import timedelta +from typing import Any, Dict, Iterable, List, Optional, Sequence, Union + +import opentelemetry.trace +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import get_current_span, get_tracer + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.common import RetryPolicy +from temporalio.contrib.opentelemetry import TracingInterceptor +from temporalio.contrib.opentelemetry import workflow as otel_workflow +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker + + +@dataclass(frozen=True) +class SerialisableSpan: + """A serialisable, incomplete representation of a span for testing purposes.""" + + @dataclass(frozen=True) + class SpanContext: + trace_id: int + span_id: int + + @classmethod + def from_span_context( + cls, context: opentelemetry.trace.SpanContext + ) -> "SerialisableSpan.SpanContext": + return cls( + trace_id=context.trace_id, + span_id=context.span_id, + ) + + @classmethod + def from_optional_span_context( + cls, context: Optional[opentelemetry.trace.SpanContext] + ) -> Optional["SerialisableSpan.SpanContext"]: + if context is None: + return None + return cls.from_span_context(context) + + @dataclass(frozen=True) + class Link: + context: SerialisableSpan.SpanContext + attributes: Dict[str, Any] + + name: str + context: Optional[SpanContext] + parent: Optional[SpanContext] + attributes: Dict[str, Any] + links: Sequence[Link] + + @classmethod + def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan": + return cls( + name=span.name, + context=cls.SpanContext.from_optional_span_context(span.context), + parent=cls.SpanContext.from_optional_span_context(span.parent), + attributes=dict(span.attributes or {}), + links=tuple( + cls.Link( + context=cls.SpanContext.from_span_context(link.context), + attributes=dict(span.attributes or {}), + ) + for link in span.links + ), + ) + + +SerialisableSpanListProxy: typing.TypeAlias = multiprocessing.managers.ListProxy[ + SerialisableSpan +] + + +def make_span_proxy_list( + manager: multiprocessing.managers.SyncManager, +) -> SerialisableSpanListProxy: + """Create a list proxy to share `SerialisableSpan` across processes.""" + return manager.list() + + +class _ListProxySpanExporter(SpanExporter): + """Implementation of :class:`SpanExporter` that exports spans to a + list proxy created by a multiprocessing manager. + + This class is used for testing multiprocessing setups, as we can get access + to the finished spans from the parent process. + + In production, you would use `OTLPSpanExporter` or similar to export spans. + Tracing is designed to be distributed, the child process can push collected + spans directly to a collector or backend, which can reassemble the spans + into a single trace. + """ + + def __init__(self, finished_spans: SerialisableSpanListProxy) -> None: + self._finished_spans = finished_spans + self._stopped = False + self._lock = threading.Lock() + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + if self._stopped: + return SpanExportResult.FAILURE + with self._lock: + # Note: ReadableSpan is not picklable, so convert to a DTO + # Note: we could use `span.to_json()` but there isn't a `from_json` + # and the serialisation isn't easily reversible, e.g. `parent` context + # is lost, span/trace IDs are transformed into strings + self._finished_spans.extend( + [SerialisableSpan.from_readable_span(span) for span in spans] + ) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + self._stopped = True + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +def dump_spans( + spans: Iterable[Union[ReadableSpan, SerialisableSpan]], + *, + parent_id: Optional[int] = None, + with_attributes: bool = True, + indent_depth: int = 0, +) -> List[str]: + ret: List[str] = [] + for span in spans: + if (not span.parent and parent_id is None) or ( + span.parent and span.parent.span_id == parent_id + ): + span_str = f"{' ' * indent_depth}{span.name}" + if with_attributes: + span_str += f" (attributes: {dict(span.attributes or {})})" + # Add links + if span.links: + span_links: List[str] = [] + for link in span.links: + for link_span in spans: + if ( + link_span.context + and link_span.context.span_id == link.context.span_id + ): + span_links.append(link_span.name) + span_str += f" (links: {', '.join(span_links)})" + # Signals can duplicate in rare situations, so we make sure not to + # re-add + if "Signal" in span_str and span_str in ret: + continue + ret.append(span_str) + ret += dump_spans( + spans, + parent_id=(span.context.span_id if span.context else None), + with_attributes=with_attributes, + indent_depth=indent_depth + 1, + ) + return ret diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index bb3d21acd..c951f0bf7 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -5,22 +5,19 @@ import logging import multiprocessing import multiprocessing.managers -import threading import typing import uuid from dataclasses import dataclass from datetime import timedelta -from typing import Any, Dict, Iterable, List, Optional, Sequence, Union +from typing import List, Optional import opentelemetry.trace -from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( SimpleSpanProcessor, - SpanExporter, - SpanExportResult, ) from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.trace import get_current_span, get_tracer +from opentelemetry.trace import get_tracer from temporalio import activity, workflow from temporalio.client import Client @@ -29,6 +26,13 @@ from temporalio.contrib.opentelemetry import workflow as otel_workflow from temporalio.testing import WorkflowEnvironment from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker +from tests.contrib.opentelemetry.helpers.tracing import ( + SerialisableSpan, + _ListProxySpanExporter, + dump_spans, + make_span_proxy_list, + SerialisableSpanListProxy, +) # Passing through because Python 3.9 has an import bug at # https://github.com/python/cpython/issues/91351 @@ -89,58 +93,6 @@ class TracingWorkflowActionContinueAsNew: ready_for_update: asyncio.Semaphore -@dataclass(frozen=True) -class SerialisableSpan: - @dataclass(frozen=True) - class SpanContext: - trace_id: int - span_id: int - - @classmethod - def from_span_context( - cls, context: opentelemetry.trace.SpanContext - ) -> "SerialisableSpan.SpanContext": - return cls( - trace_id=context.trace_id, - span_id=context.span_id, - ) - - @classmethod - def from_optional_span_context( - cls, context: Optional[opentelemetry.trace.SpanContext] - ) -> Optional["SerialisableSpan.SpanContext"]: - if context is None: - return None - return cls.from_span_context(context) - - @dataclass(frozen=True) - class Link: - context: SerialisableSpan.SpanContext - attributes: Dict[str, Any] - - name: str - context: Optional[SpanContext] - parent: Optional[SpanContext] - attributes: Dict[str, Any] - links: Sequence[Link] - - @classmethod - def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan": - return cls( - name=span.name, - context=cls.SpanContext.from_optional_span_context(span.context), - parent=cls.SpanContext.from_optional_span_context(span.parent), - attributes=dict(span.attributes or {}), - links=tuple( - cls.Link( - context=cls.SpanContext.from_span_context(link.context), - attributes=dict(span.attributes or {}), - ) - for link in span.links - ), - ) - - @workflow.defn class TracingWorkflow: def __init__(self) -> None: @@ -361,46 +313,6 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): ] -def dump_spans( - spans: Iterable[Union[ReadableSpan, SerialisableSpan]], - *, - parent_id: Optional[int] = None, - with_attributes: bool = True, - indent_depth: int = 0, -) -> List[str]: - ret: List[str] = [] - for span in spans: - if (not span.parent and parent_id is None) or ( - span.parent and span.parent.span_id == parent_id - ): - span_str = f"{' ' * indent_depth}{span.name}" - if with_attributes: - span_str += f" (attributes: {dict(span.attributes or {})})" - # Add links - if span.links: - span_links: List[str] = [] - for link in span.links: - for link_span in spans: - if ( - link_span.context - and link_span.context.span_id == link.context.span_id - ): - span_links.append(link_span.name) - span_str += f" (links: {', '.join(span_links)})" - # Signals can duplicate in rare situations, so we make sure not to - # re-add - if "Signal" in span_str and span_str in ret: - continue - ret.append(span_str) - ret += dump_spans( - spans, - parent_id=(span.context.span_id if span.context else None), - with_attributes=with_attributes, - indent_depth=indent_depth + 1, - ) - return ret - - @workflow.defn class SimpleWorkflow: @workflow.run @@ -503,9 +415,7 @@ async def test_activity_trace_propagation( # Create a proxy list using the server process manager which we'll use # to access finished spans in the process pool manager = multiprocessing.Manager() - finished_spans_proxy = typing.cast( - multiprocessing.managers.ListProxy[SerialisableSpan], manager.list() - ) + finished_spans_proxy = make_span_proxy_list(manager) # Create a worker with a process pool activity executor async with Worker( @@ -535,48 +445,8 @@ async def test_activity_trace_propagation( ] -class _ListProxySpanExporter(SpanExporter): - """Implementation of :class:`SpanExporter` that exports spans to a - list proxy created by a multiprocessing manager. - - This class is used for testing multiprocessing setups, as we can get access - to the finished spans from the parent process. - - In production, you would use `OTLPSpanExporter` or similar to export spans. - Tracing is designed to be distributed, the child process can push collected - spans directly to a collector or backend, which can reassemble the spans - into a single trace. - """ - - def __init__( - self, finished_spans: multiprocessing.managers.ListProxy[SerialisableSpan] - ) -> None: - self._finished_spans = finished_spans - self._stopped = False - self._lock = threading.Lock() - - def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: - if self._stopped: - return SpanExportResult.FAILURE - with self._lock: - # Note: ReadableSpan is not picklable, so convert to a DTO - # Note: we could use `span.to_json()` but there isn't a `from_json` - # and the serialisation isn't easily reversible, e.g. `parent` context - # is lost, span/trace IDs are transformed into strings - self._finished_spans.extend( - [SerialisableSpan.from_readable_span(span) for span in spans] - ) - return SpanExportResult.SUCCESS - - def shutdown(self) -> None: - self._stopped = True - - def force_flush(self, timeout_millis: int = 30000) -> bool: - return True - - def activity_trace_propagation_initializer( - _finished_spans_proxy: multiprocessing.managers.ListProxy[SerialisableSpan], + _finished_spans_proxy: SerialisableSpanListProxy, ) -> None: """Initializer for the process pool worker to export spans to a shared list.""" _exporter = _ListProxySpanExporter(_finished_spans_proxy) From c9b4b137eee5b3dd2f78d3c6ccefe934db4fb396 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Sun, 3 Aug 2025 20:34:45 +0100 Subject: [PATCH 5/6] wip: test reflection isn't broken --- .../helpers/reflection_interceptor.py | 77 +++++++++++++++++++ .../contrib/opentelemetry/helpers/tracing.py | 29 ++----- tests/contrib/test_opentelemetry.py | 28 +++++-- 3 files changed, 104 insertions(+), 30 deletions(-) create mode 100644 tests/contrib/opentelemetry/helpers/reflection_interceptor.py diff --git a/tests/contrib/opentelemetry/helpers/reflection_interceptor.py b/tests/contrib/opentelemetry/helpers/reflection_interceptor.py new file mode 100644 index 000000000..ec599b36b --- /dev/null +++ b/tests/contrib/opentelemetry/helpers/reflection_interceptor.py @@ -0,0 +1,77 @@ +import dataclasses +import logging +import typing + +import temporalio.worker + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class InterceptedActivity: + class_name: str + name: typing.Optional[str] + qualname: typing.Optional[str] + module: typing.Optional[str] + annotations: typing.Dict[str, typing.Any] + docstring: typing.Optional[str] + + +class ReflectionInterceptor(temporalio.worker.Interceptor): + """Interceptor to check we haven't broken reflection when wrapping the activity.""" + + def __init__(self) -> None: + self._intercepted_activities: list[InterceptedActivity] = [] + + def get_intercepted_activities(self) -> typing.List[InterceptedActivity]: + """Get the list of intercepted activities.""" + return self._intercepted_activities + + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + """Method called for intercepting an activity. + + Args: + next: The underlying inbound interceptor this interceptor should + delegate to. + + Returns: + The new interceptor that will be used to for the activity. + """ + return _ReflectionActivityInboundInterceptor(next, self) + + +class _ReflectionActivityInboundInterceptor( + temporalio.worker.ActivityInboundInterceptor +): + def __init__( + self, + next: temporalio.worker.ActivityInboundInterceptor, + root: ReflectionInterceptor, + ) -> None: + super().__init__(next) + self.root = root + + async def execute_activity( + self, input: temporalio.worker.ExecuteActivityInput + ) -> typing.Any: + """Called to invoke the activity.""" + + try: + self.root._intercepted_activities.append( + InterceptedActivity( + class_name=input.fn.__class__.__name__, + name=getattr(input.fn, "__name__", None), + qualname=getattr(input.fn, "__qualname__", None), + module=getattr(input.fn, "__module__", None), + docstring=getattr(input.fn, "__doc__", None), + annotations=getattr(input.fn, "__annotations__", {}), + ) + ) + except AttributeError: + logger.exception( + "Activity function does not have expected attributes, skipping reflection." + ) + + return await self.next.execute_activity(input) diff --git a/tests/contrib/opentelemetry/helpers/tracing.py b/tests/contrib/opentelemetry/helpers/tracing.py index c952d3af3..cbf7cddae 100644 --- a/tests/contrib/opentelemetry/helpers/tracing.py +++ b/tests/contrib/opentelemetry/helpers/tracing.py @@ -1,34 +1,18 @@ from __future__ import annotations -import asyncio -import concurrent.futures -import logging import multiprocessing import multiprocessing.managers import threading import typing -import uuid from dataclasses import dataclass -from datetime import timedelta from typing import Any, Dict, Iterable, List, Optional, Sequence, Union import opentelemetry.trace -from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import ( - SimpleSpanProcessor, SpanExporter, SpanExportResult, ) -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.trace import get_current_span, get_tracer - -from temporalio import activity, workflow -from temporalio.client import Client -from temporalio.common import RetryPolicy -from temporalio.contrib.opentelemetry import TracingInterceptor -from temporalio.contrib.opentelemetry import workflow as otel_workflow -from temporalio.testing import WorkflowEnvironment -from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker @dataclass(frozen=True) @@ -85,14 +69,9 @@ def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan": ) -SerialisableSpanListProxy: typing.TypeAlias = multiprocessing.managers.ListProxy[ - SerialisableSpan -] - - def make_span_proxy_list( manager: multiprocessing.managers.SyncManager, -) -> SerialisableSpanListProxy: +) -> multiprocessing.managers.ListProxy[SerialisableSpan]: """Create a list proxy to share `SerialisableSpan` across processes.""" return manager.list() @@ -110,7 +89,9 @@ class _ListProxySpanExporter(SpanExporter): into a single trace. """ - def __init__(self, finished_spans: SerialisableSpanListProxy) -> None: + def __init__( + self, finished_spans: multiprocessing.managers.ListProxy[SerialisableSpan] + ) -> None: self._finished_spans = finished_spans self._stopped = False self._lock = threading.Lock() diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index c951f0bf7..1645d371c 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -26,12 +26,15 @@ from temporalio.contrib.opentelemetry import workflow as otel_workflow from temporalio.testing import WorkflowEnvironment from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker +from tests.contrib.opentelemetry.helpers.reflection_interceptor import ( + InterceptedActivity, + ReflectionInterceptor, +) from tests.contrib.opentelemetry.helpers.tracing import ( SerialisableSpan, _ListProxySpanExporter, dump_spans, make_span_proxy_list, - SerialisableSpanListProxy, ) # Passing through because Python 3.9 has an import bug at @@ -403,9 +406,6 @@ async def test_activity_trace_propagation( client: Client, env: WorkflowEnvironment, ): - # TODO: add spy interceptor to check `input.fn` wraps original metadata - # TODO: Add Resource to show how resource would be propagated - # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() provider = TracerProvider() @@ -417,13 +417,16 @@ async def test_activity_trace_propagation( manager = multiprocessing.Manager() finished_spans_proxy = make_span_proxy_list(manager) + # Create an interceptor to test we haven't broken reflection + reflection_interceptor = ReflectionInterceptor() + # Create a worker with a process pool activity executor async with Worker( client, task_queue=f"task_queue_{uuid.uuid4()}", workflows=[ActivityTracePropagationWorkflow], activities=[sync_activity], - interceptors=[TracingInterceptor(tracer)], + interceptors=[TracingInterceptor(tracer), reflection_interceptor], activity_executor=concurrent.futures.ProcessPoolExecutor( max_workers=1, initializer=activity_trace_propagation_initializer, @@ -437,6 +440,7 @@ async def test_activity_trace_propagation( task_queue=worker.task_queue, ) + # The dumped spans should include child spans created in the child process spans = exporter.get_finished_spans() + tuple(finished_spans_proxy) logging.debug("Spans:\n%s", "\n".join(dump_spans(spans, with_attributes=False))) assert dump_spans(spans, with_attributes=False) == [ @@ -444,9 +448,21 @@ async def test_activity_trace_propagation( " child_span", ] + # and the activity should still have the original attributes in downstream interceptors + assert reflection_interceptor.get_intercepted_activities() == [ + InterceptedActivity( + class_name="ActivityFnWithTraceContext", + name="sync_activity", + qualname="sync_activity", + module="tests.contrib.test_opentelemetry", + docstring="An activity that uses tracing features.\n\nWhen executed in a process pool, we expect the trace context to be available\nfrom the parent process.\n", + annotations={"param": "typing.Any", "return": "str"}, + ) + ] + def activity_trace_propagation_initializer( - _finished_spans_proxy: SerialisableSpanListProxy, + _finished_spans_proxy: multiprocessing.managers.ListProxy[SerialisableSpan], ) -> None: """Initializer for the process pool worker to export spans to a shared list.""" _exporter = _ListProxySpanExporter(_finished_spans_proxy) From acd0bb802c936900bd21ca6009afc43c428c5070 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 4 Aug 2025 00:08:28 +0100 Subject: [PATCH 6/6] wip: fix failing test on Python 3.9 - For some reason, the docstring comparison for the reflection check seemed to fail in Python 3.9 - I shortened the docstring to make it easier to compare in VSCode test output, that seemed to fix the test. Maybe 3.9 doesn't strip leading spaces in the docstring (e.g. like textwrap.dedent)? --- tests/contrib/test_opentelemetry.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 1645d371c..2695f2b3a 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -390,11 +390,7 @@ async def run(self) -> str: @activity.defn def sync_activity(param: typing.Any) -> str: - """An activity that uses tracing features. - - When executed in a process pool, we expect the trace context to be available - from the parent process. - """ + """An activity that uses tracing features.""" inner_tracer = get_tracer("sync_activity") with inner_tracer.start_as_current_span( "child_span", @@ -455,7 +451,7 @@ async def test_activity_trace_propagation( name="sync_activity", qualname="sync_activity", module="tests.contrib.test_opentelemetry", - docstring="An activity that uses tracing features.\n\nWhen executed in a process pool, we expect the trace context to be available\nfrom the parent process.\n", + docstring="An activity that uses tracing features.", annotations={"param": "typing.Any", "return": "str"}, ) ]