diff --git a/changelog.d/19007.misc b/changelog.d/19007.misc new file mode 100644 index 00000000000..720623e98ec --- /dev/null +++ b/changelog.d/19007.misc @@ -0,0 +1 @@ +Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 119d3be7bf4..5cfd8616854 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -56,6 +56,7 @@ from twisted.python.threadpool import ThreadPool if TYPE_CHECKING: + from synapse.logging.scopecontextmanager import _LogContextScope from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -238,7 +239,14 @@ class _Sentinel: we should always know which server the logs are coming from. """ - __slots__ = ["previous_context", "finished", "server_name", "request", "tag"] + __slots__ = [ + "previous_context", + "finished", + "scope", + "server_name", + "request", + "tag", + ] def __init__(self) -> None: # Minimal set for compatibility with LoggingContext @@ -246,6 +254,7 @@ def __init__(self) -> None: self.finished = False self.server_name = "unknown_server_from_sentinel_context" self.request = None + self.scope = None self.tag = None def __str__(self) -> str: @@ -303,6 +312,7 @@ class LoggingContext: "finished", "request", "tag", + "scope", ] def __init__( @@ -327,6 +337,7 @@ def __init__( self.main_thread = get_thread_id() self.request = None self.tag = "" + self.scope: Optional["_LogContextScope"] = None # keep track of whether we have hit the __exit__ block for this context # (suggesting that the the thing that created the context thinks it should @@ -340,6 +351,9 @@ def __init__( # which request this corresponds to self.request = self.parent_context.request + # we also track the current scope: + self.scope = self.parent_context.scope + if request is not None: # the request param overrides the request from the parent context self.request = request diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 8d350016ceb..1c89a358dfc 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -251,17 +251,18 @@ class _DummyTagNames: try: import opentracing import opentracing.tags - from opentracing.scope_managers.contextvars import ContextVarsScopeManager tags = opentracing.tags except ImportError: opentracing = None # type: ignore[assignment] tags = _DummyTagNames # type: ignore[assignment] - ContextVarsScopeManager = None # type: ignore try: from jaeger_client import Config as JaegerConfig + + from synapse.logging.scopecontextmanager import LogContextScopeManager except ImportError: JaegerConfig = None # type: ignore + LogContextScopeManager = None # type: ignore try: @@ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None: config = JaegerConfig( config=jaeger_config, service_name=f"{hs.config.server.server_name} {instance_name_by_type}", - scope_manager=ContextVarsScopeManager(), + scope_manager=LogContextScopeManager(), metrics_factory=PrometheusMetricsFactory(), ) @@ -683,9 +684,21 @@ def start_active_span_from_edu( # Opentracing setters for tags, logs, etc @only_if_tracing -def active_span() -> Optional["opentracing.Span"]: - """Get the currently active span, if any""" - return opentracing.tracer.active_span +def active_span( + *, + tracer: Optional["opentracing.Tracer"] = None, +) -> Optional["opentracing.Span"]: + """ + Get the currently active span, if any + + Args: + tracer: override the opentracing tracer. By default the global tracer is used. + """ + if tracer is None: + # use the global tracer by default + tracer = opentracing.tracer + + return tracer.active_span @ensure_active_span("set a tag") diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py new file mode 100644 index 00000000000..feaadc4d87a --- /dev/null +++ b/synapse/logging/scopecontextmanager.py @@ -0,0 +1,161 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# + +import logging +from typing import Optional + +from opentracing import Scope, ScopeManager, Span + +from synapse.logging.context import ( + LoggingContext, + current_context, + nested_logging_context, +) + +logger = logging.getLogger(__name__) + + +class LogContextScopeManager(ScopeManager): + """ + The LogContextScopeManager tracks the active scope in opentracing + by using the log contexts which are native to synapse. This is so + that the basic opentracing api can be used across twisted defereds. + + It would be nice just to use opentracing's ContextVarsScopeManager, + but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301. + """ + + def __init__(self) -> None: + pass + + @property + def active(self) -> Optional[Scope]: + """ + Returns the currently active Scope which can be used to access the + currently active Scope.span. + If there is a non-null Scope, its wrapped Span + becomes an implicit parent of any newly-created Span at + Tracer.start_active_span() time. + + Return: + The Scope that is active, or None if not available. + """ + ctx = current_context() + return ctx.scope + + def activate(self, span: Span, finish_on_close: bool) -> Scope: + """ + Makes a Span active. + Args + span: the span that should become active. + finish_on_close: whether Span should be automatically finished when + Scope.close() is called. + + Returns: + Scope to control the end of the active period for + *span*. It is a programming error to neglect to call + Scope.close() on the returned instance. + """ + + ctx = current_context() + + if not ctx: + logger.error("Tried to activate scope outside of loggingcontext") + return Scope(None, span) # type: ignore[arg-type] + + if ctx.scope is not None: + # start a new logging context as a child of the existing one. + # Doing so -- rather than updating the existing logcontext -- means that + # creating several concurrent spans under the same logcontext works + # correctly. + ctx = nested_logging_context("") + enter_logcontext = True + else: + # if there is no span currently associated with the current logcontext, we + # just store the scope in it. + # + # This feels a bit dubious, but it does hack around a problem where a + # span outlasts its parent logcontext (which would otherwise lead to + # "Re-starting finished log context" errors). + enter_logcontext = False + + scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) + ctx.scope = scope + if enter_logcontext: + ctx.__enter__() + + return scope + + +class _LogContextScope(Scope): + """ + A custom opentracing scope, associated with a LogContext + + * When the scope is closed, the logcontext's active scope is reset to None. + and - if enter_logcontext was set - the logcontext is finished too. + """ + + def __init__( + self, + manager: LogContextScopeManager, + span: Span, + logcontext: LoggingContext, + enter_logcontext: bool, + finish_on_close: bool, + ): + """ + Args: + manager: + the manager that is responsible for this scope. + span: + the opentracing span which this scope represents the local + lifetime for. + logcontext: + the log context to which this scope is attached. + enter_logcontext: + if True the log context will be exited when the scope is finished + finish_on_close: + if True finish the span when the scope is closed + """ + super().__init__(manager, span) + self.logcontext = logcontext + self._finish_on_close = finish_on_close + self._enter_logcontext = enter_logcontext + + def __str__(self) -> str: + return f"Scope<{self.span}>" + + def close(self) -> None: + active_scope = self.manager.active + if active_scope is not self: + logger.error( + "Closing scope %s which is not the currently-active one %s", + self, + active_scope, + ) + + if self._finish_on_close: + self.span.finish() + + self.logcontext.scope = None + + if self._enter_logcontext: + self.logcontext.__exit__(None, None, None) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 6dc2cbe1322..05e84038acd 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -68,6 +68,11 @@ from synapse.server import HomeServer + try: + import opentracing + except ImportError: + opentracing = None # type: ignore[assignment] + logger = logging.getLogger(__name__) @@ -225,6 +230,7 @@ def run_as_background_process( func: Callable[..., Awaitable[Optional[R]]], *args: Any, bg_start_span: bool = True, + test_only_tracer: Optional["opentracing.Tracer"] = None, **kwargs: Any, ) -> "defer.Deferred[Optional[R]]": """Run the given function in its own logcontext, with resource metrics @@ -250,6 +256,8 @@ def run_as_background_process( bg_start_span: Whether to start an opentracing span. Defaults to True. Should only be disabled for processes that will not log to or tag a span. + test_only_tracer: Set the OpenTracing tracer to use. This is only useful for + tests. args: positional args for func kwargs: keyword args for func @@ -259,6 +267,12 @@ def run_as_background_process( rules. """ + # Since we track the tracing scope in the `LoggingContext`, before we move to the + # sentinel logcontext (or a new `LoggingContext`), grab the currently active + # tracing span (if any) so that we can create a cross-link to the background process + # trace. + original_active_tracing_span = active_span(tracer=test_only_tracer) + async def run() -> Optional[R]: with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) @@ -276,8 +290,6 @@ async def run() -> Optional[R]: ) as logging_context: try: if bg_start_span: - original_active_tracing_span = active_span() - # If there is already an active span (e.g. because this background # process was started as part of handling a request for example), # because this is a long-running background task that may serve a @@ -308,6 +320,7 @@ async def run() -> Optional[R]: # Create a root span for the background process (disconnected # from other spans) ignore_active_span=True, + tracer=test_only_tracer, ) # Also add a span in the original request trace that cross-links @@ -324,8 +337,11 @@ async def run() -> Optional[R]: f"start_bgproc.{desc}", child_of=original_active_tracing_span, ignore_active_span=True, - # Points to the background process span. + # Create the `FOLLOWS_FROM` reference to the background + # process span so there is a loose coupling between the two + # traces and it's easy to jump between. contexts=[root_tracing_scope.span.context], + tracer=test_only_tracer, ): pass @@ -341,6 +357,7 @@ async def run() -> Optional[R]: # span so there is a loose coupling between the two # traces and it's easy to jump between. contexts=[original_active_tracing_span.context], + tracer=test_only_tracer, ) # For easy usage down below, we create a context manager that @@ -359,6 +376,7 @@ def combined_context_manager() -> Generator[None, None, None]: tracing_scope = start_active_span( f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(logging_context)}, + tracer=test_only_tracer, ) else: tracing_scope = nullcontext() diff --git a/tests/logging/test_opentracing.py b/tests/logging/test_opentracing.py index 31cdfacd2cd..2f389f7f44f 100644 --- a/tests/logging/test_opentracing.py +++ b/tests/logging/test_opentracing.py @@ -19,7 +19,7 @@ # # -from typing import Awaitable, Dict, cast +from typing import Awaitable, Optional, cast from twisted.internet import defer from twisted.internet.testing import MemoryReactorClock @@ -35,20 +35,25 @@ tag_args, trace_with_opname, ) +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.clock import Clock -try: - import opentracing - from opentracing.scope_managers.contextvars import ContextVarsScopeManager -except ImportError: - opentracing = None # type: ignore - ContextVarsScopeManager = None # type: ignore +from tests.server import get_clock try: import jaeger_client except ImportError: jaeger_client = None # type: ignore + +try: + import opentracing + + from synapse.logging.scopecontextmanager import LogContextScopeManager +except ImportError: + opentracing = None # type: ignore + LogContextScopeManager = None # type: ignore + import logging from tests.unittest import TestCase @@ -56,7 +61,7 @@ logger = logging.getLogger(__name__) -class TracingScopeTestCase(TestCase): +class LogContextScopeManagerTestCase(TestCase): """ Test that our tracing machinery works well in a variety of situations (especially with Twisted's runtime and deferreds). @@ -67,7 +72,7 @@ class TracingScopeTestCase(TestCase): opentracing backend is Jaeger. """ - if opentracing is None: + if opentracing is None or LogContextScopeManager is None: skip = "Requires opentracing" # type: ignore[unreachable] if jaeger_client is None: skip = "Requires jaeger_client" # type: ignore[unreachable] @@ -77,9 +82,8 @@ def setUp(self) -> None: # global variables that power opentracing. We create our own tracer instance # and test with it. - scope_manager = ContextVarsScopeManager() config = jaeger_client.config.Config( - config={}, service_name="test", scope_manager=scope_manager + config={}, service_name="test", scope_manager=LogContextScopeManager() ) self._reporter = jaeger_client.reporter.InMemoryReporter() @@ -220,144 +224,6 @@ async def root() -> None: [scopes[1].span, scopes[2].span, scopes[0].span], ) - def test_run_in_background_active_scope_still_available(self) -> None: - """ - Test that tasks running via `run_in_background` still have access to the - active tracing scope. - - This is a regression test for a previous Synapse issue where the tracing scope - would `__exit__` and close before the `run_in_background` task completed and our - own previous custom `_LogContextScope.close(...)` would clear - `LoggingContext.scope` preventing further tracing spans from having the correct - parent. - """ - reactor = MemoryReactorClock() - # type-ignore: mypy-zope doesn't seem to recognise that `MemoryReactorClock` - # implements `ISynapseThreadlessReactor` (combination of the normal Twisted - # Reactor/Clock interfaces), via inheritance from - # `twisted.internet.testing.MemoryReactor` and `twisted.internet.testing.Clock` - # Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock` - # for testing purposes. - clock = Clock( # type: ignore[multiple-internal-clocks] - reactor, # type: ignore[arg-type] - server_name="test_server", - ) - - scope_map: Dict[str, opentracing.Scope] = {} - - async def async_task() -> None: - root_scope = scope_map["root"] - root_context = cast(jaeger_client.SpanContext, root_scope.span.context) - - self.assertEqual( - self._tracer.active_span, - root_scope.span, - "expected to inherit the root tracing scope from where this was run", - ) - - # Return control back to the reactor thread and wait an arbitrary amount - await clock.sleep(4) - - # This is a key part of what we're testing! In a previous version of - # Synapse, we would lose the active span at this point. - self.assertEqual( - self._tracer.active_span, - root_scope.span, - "expected to still have a root tracing scope/span active", - ) - - # For complete-ness sake, let's also trace more sub-tasks here and assert - # they have the correct span parents as well (root) - - # Start tracing some other sub-task. - # - # This is a key part of what we're testing! In a previous version of - # Synapse, it would have the incorrect span parents. - scope = start_active_span( - "task1", - tracer=self._tracer, - ) - scope_map["task1"] = scope - - # Ensure the span parent is pointing to the root scope - context = cast(jaeger_client.SpanContext, scope.span.context) - self.assertEqual( - context.parent_id, - root_context.span_id, - "expected task1 parent to be the root span", - ) - - # Ensure that the active span is our new sub-task now - self.assertEqual(self._tracer.active_span, scope.span) - # Return control back to the reactor thread and wait an arbitrary amount - await clock.sleep(4) - # We should still see the active span as the scope wasn't closed yet - self.assertEqual(self._tracer.active_span, scope.span) - scope.close() - - async def root() -> None: - with start_active_span( - "root span", - tracer=self._tracer, - # We will close this off later. We're basically just mimicking the same - # pattern for how we handle requests. We pass the span off to the - # request for it to finish. - finish_on_close=False, - ) as root_scope: - scope_map["root"] = root_scope - self.assertEqual(self._tracer.active_span, root_scope.span) - - # Fire-and-forget a task - # - # XXX: The root scope context manager will `__exit__` before this task - # completes. - run_in_background(async_task) - - # Because we used `run_in_background`, the active span should still be - # the root. - self.assertEqual(self._tracer.active_span, root_scope.span) - - # We shouldn't see any active spans outside of the scope - self.assertIsNone(self._tracer.active_span) - - with LoggingContext(name="root context", server_name="test_server"): - # Start the test off - d_root = defer.ensureDeferred(root()) - - # Let the tasks complete - reactor.pump((2,) * 8) - self.successResultOf(d_root) - - # After we see all of the tasks are done (like a request when it - # `_finished_processing`), let's finish our root span - scope_map["root"].span.finish() - - # Sanity check again: We shouldn't see any active spans leftover in this - # this context. - self.assertIsNone(self._tracer.active_span) - - # The spans should be reported in order of their finishing: task 1, task 2, - # root. - # - # We use `assertIncludes` just as an easier way to see if items are missing or - # added. We assert the order just below - self.assertIncludes( - set(self._reporter.get_spans()), - { - scope_map["task1"].span, - scope_map["root"].span, - }, - exact=True, - ) - # This is where we actually assert the correct order - self.assertEqual( - self._reporter.get_spans(), - [ - scope_map["task1"].span, - scope_map["root"].span, - ], - ) - def test_trace_decorator_sync(self) -> None: """ Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args` @@ -455,3 +321,203 @@ async def runner() -> str: [span.operation_name for span in self._reporter.get_spans()], ["fixture_awaitable_return_func"], ) + + async def test_run_as_background_process_standalone(self) -> None: + """ + Test to make sure that the background process work starts its own trace. + """ + reactor, clock = get_clock() + + callback_finished = False + active_span_in_callback: Optional[jaeger_client.Span] = None + + async def bg_task() -> None: + nonlocal callback_finished, active_span_in_callback + try: + assert isinstance(self._tracer.active_span, jaeger_client.Span) + active_span_in_callback = self._tracer.active_span + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + # type-ignore: We ignore because the point is to test the bare function + run_as_background_process( # type: ignore[untracked-background-process] + desc="some-bg-task", + server_name="test_server", + func=bg_task, + test_only_tracer=self._tracer, + ) + + # Now wait for the background process to finish + while not callback_finished: + await clock.sleep(0) + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + self.assertEqual( + active_span_in_callback.operation_name if active_span_in_callback else None, + "bgproc.some-bg-task", + "expected a new span to be started for the background task", + ) + + # The spans should be reported in order of their finishing. + # + # We use `assertIncludes` just as an easier way to see if items are missing or + # added. We assert the order just below + actual_spans = [span.operation_name for span in self._reporter.get_spans()] + expected_spans = ["bgproc.some-bg-task"] + self.assertIncludes( + set(actual_spans), + set(expected_spans), + exact=True, + ) + # This is where we actually assert the correct order + self.assertEqual( + actual_spans, + expected_spans, + ) + + async def test_run_as_background_process_cross_link(self) -> None: + """ + Test to make sure that the background process work has its own trace and is + disconnected from any currently active trace (like a request). But we still have + cross-links between the two traces if there was already an active trace/span when + we kicked off the background process. + """ + reactor, clock = get_clock() + + callback_finished = False + active_span_in_callback: Optional[jaeger_client.Span] = None + + async def bg_task() -> None: + nonlocal callback_finished, active_span_in_callback + try: + assert isinstance(self._tracer.active_span, jaeger_client.Span) + active_span_in_callback = self._tracer.active_span + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext(name="some-request", server_name="test_server"): + with start_active_span( + "some-request", + tracer=self._tracer, + ): + # type-ignore: We ignore because the point is to test the bare function + run_as_background_process( # type: ignore[untracked-background-process] + desc="some-bg-task", + server_name="test_server", + func=bg_task, + test_only_tracer=self._tracer, + ) + + # Now wait for the background process to finish + while not callback_finished: + await clock.sleep(0) + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # We start `bgproc.some-bg-task` and `bgproc_child.some-bg-task` (see + # `run_as_background_process` implementation for why). Either is fine but for + # now we expect the child as its the innermost one that was started. + self.assertEqual( + active_span_in_callback.operation_name if active_span_in_callback else None, + "bgproc_child.some-bg-task", + "expected a new span to be started for the background task", + ) + + # The spans should be reported in order of their finishing. + # + # We use `assertIncludes` just as an easier way to see if items are missing or + # added. We assert the order just below + actual_spans = [span.operation_name for span in self._reporter.get_spans()] + expected_spans = [ + "start_bgproc.some-bg-task", + "bgproc_child.some-bg-task", + "bgproc.some-bg-task", + "some-request", + ] + self.assertIncludes( + set(actual_spans), + set(expected_spans), + exact=True, + ) + # This is where we actually assert the correct order + self.assertEqual( + actual_spans, + expected_spans, + ) + + span_map = {span.operation_name: span for span in self._reporter.get_spans()} + span_id_to_friendly_name = { + span.span_id: span.operation_name for span in self._reporter.get_spans() + } + + def get_span_friendly_name(span_id: Optional[int]) -> str: + if span_id is None: + return "None" + + return span_id_to_friendly_name.get(span_id, f"unknown span {span_id}") + + # Ensure the background process trace/span is disconnected from the request + # trace/span. + self.assertNotEqual( + get_span_friendly_name(span_map["bgproc.some-bg-task"].parent_id), + get_span_friendly_name(span_map["some-request"].span_id), + ) + + # We should see a cross-link in the request trace pointing to the background + # process trace. + # + # Make sure `start_bgproc.some-bg-task` is part of the request trace + self.assertEqual( + get_span_friendly_name(span_map["start_bgproc.some-bg-task"].parent_id), + get_span_friendly_name(span_map["some-request"].span_id), + ) + # And has some references to the background process trace + self.assertIncludes( + { + f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}" + if isinstance(reference.referenced_context, jaeger_client.SpanContext) + else f"{reference.type}:None" + for reference in ( + span_map["start_bgproc.some-bg-task"].references or [] + ) + }, + { + f"follows_from:{get_span_friendly_name(span_map['bgproc.some-bg-task'].span_id)}" + }, + exact=True, + ) + + # We should see a cross-link in the background process trace pointing to the + # request trace that kicked off the work. + # + # Make sure `start_bgproc.some-bg-task` is part of the request trace + self.assertEqual( + get_span_friendly_name(span_map["bgproc_child.some-bg-task"].parent_id), + get_span_friendly_name(span_map["bgproc.some-bg-task"].span_id), + ) + # And has some references to the background process trace + self.assertIncludes( + { + f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}" + if isinstance(reference.referenced_context, jaeger_client.SpanContext) + else f"{reference.type}:None" + for reference in ( + span_map["bgproc_child.some-bg-task"].references or [] + ) + }, + { + f"follows_from:{get_span_friendly_name(span_map['some-request'].span_id)}" + }, + exact=True, + )