Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19007.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled.
16 changes: 15 additions & 1 deletion synapse/logging/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from twisted.python.threadpool import ThreadPool

if TYPE_CHECKING:
from synapse.logging.scopecontextmanager import _LogContextScope
from synapse.types import ISynapseReactor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -238,14 +239,22 @@ class _Sentinel:
we should always know which server the logs are coming from.
"""

__slots__ = ["previous_context", "finished", "server_name", "request", "tag"]
__slots__ = [
"previous_context",
"finished",
"scope",
"server_name",
"request",
"tag",
]

def __init__(self) -> None:
# Minimal set for compatibility with LoggingContext
self.previous_context = None
self.finished = False
self.server_name = "unknown_server_from_sentinel_context"
self.request = None
self.scope = None
self.tag = None

def __str__(self) -> str:
Expand Down Expand Up @@ -303,6 +312,7 @@ class LoggingContext:
"finished",
"request",
"tag",
"scope",
]

def __init__(
Expand All @@ -327,6 +337,7 @@ def __init__(
self.main_thread = get_thread_id()
self.request = None
self.tag = ""
self.scope: Optional["_LogContextScope"] = None

# keep track of whether we have hit the __exit__ block for this context
# (suggesting that the the thing that created the context thinks it should
Expand All @@ -340,6 +351,9 @@ def __init__(
# which request this corresponds to
self.request = self.parent_context.request

# we also track the current scope:
self.scope = self.parent_context.scope

if request is not None:
# the request param overrides the request from the parent context
self.request = request
Expand Down
25 changes: 19 additions & 6 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,18 @@ class _DummyTagNames:
try:
import opentracing
import opentracing.tags
from opentracing.scope_managers.contextvars import ContextVarsScopeManager

tags = opentracing.tags
except ImportError:
opentracing = None # type: ignore[assignment]
tags = _DummyTagNames # type: ignore[assignment]
ContextVarsScopeManager = None # type: ignore
try:
from jaeger_client import Config as JaegerConfig

from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
JaegerConfig = None # type: ignore
LogContextScopeManager = None # type: ignore


try:
Expand Down Expand Up @@ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None:
config = JaegerConfig(
config=jaeger_config,
service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
scope_manager=ContextVarsScopeManager(),
scope_manager=LogContextScopeManager(),
metrics_factory=PrometheusMetricsFactory(),
)

Expand Down Expand Up @@ -683,9 +684,21 @@ def start_active_span_from_edu(

# Opentracing setters for tags, logs, etc
@only_if_tracing
def active_span() -> Optional["opentracing.Span"]:
"""Get the currently active span, if any"""
return opentracing.tracer.active_span
def active_span(
*,
tracer: Optional["opentracing.Tracer"] = None,
) -> Optional["opentracing.Span"]:
"""
Get the currently active span, if any

Args:
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if tracer is None:
# use the global tracer by default
tracer = opentracing.tracer

return tracer.active_span


@ensure_active_span("set a tag")
Expand Down
161 changes: 161 additions & 0 deletions synapse/logging/scopecontextmanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#

import logging
from typing import Optional

from opentracing import Scope, ScopeManager, Span

from synapse.logging.context import (
LoggingContext,
current_context,
nested_logging_context,
)

logger = logging.getLogger(__name__)


class LogContextScopeManager(ScopeManager):
"""
The LogContextScopeManager tracks the active scope in opentracing
by using the log contexts which are native to synapse. This is so
that the basic opentracing api can be used across twisted defereds.

It would be nice just to use opentracing's ContextVarsScopeManager,
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
"""

def __init__(self) -> None:
pass

@property
def active(self) -> Optional[Scope]:
"""
Returns the currently active Scope which can be used to access the
currently active Scope.span.
If there is a non-null Scope, its wrapped Span
becomes an implicit parent of any newly-created Span at
Tracer.start_active_span() time.

Return:
The Scope that is active, or None if not available.
"""
ctx = current_context()
return ctx.scope

def activate(self, span: Span, finish_on_close: bool) -> Scope:
"""
Makes a Span active.
Args
span: the span that should become active.
finish_on_close: whether Span should be automatically finished when
Scope.close() is called.

Returns:
Scope to control the end of the active period for
*span*. It is a programming error to neglect to call
Scope.close() on the returned instance.
"""

ctx = current_context()

if not ctx:
logger.error("Tried to activate scope outside of loggingcontext")
return Scope(None, span) # type: ignore[arg-type]

if ctx.scope is not None:
# start a new logging context as a child of the existing one.
# Doing so -- rather than updating the existing logcontext -- means that
# creating several concurrent spans under the same logcontext works
# correctly.
ctx = nested_logging_context("")
enter_logcontext = True
else:
# if there is no span currently associated with the current logcontext, we
# just store the scope in it.
#
# This feels a bit dubious, but it does hack around a problem where a
# span outlasts its parent logcontext (which would otherwise lead to
# "Re-starting finished log context" errors).
enter_logcontext = False

scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
ctx.scope = scope
if enter_logcontext:
ctx.__enter__()

return scope


class _LogContextScope(Scope):
"""
A custom opentracing scope, associated with a LogContext

* When the scope is closed, the logcontext's active scope is reset to None.
and - if enter_logcontext was set - the logcontext is finished too.
"""

def __init__(
self,
manager: LogContextScopeManager,
span: Span,
logcontext: LoggingContext,
enter_logcontext: bool,
finish_on_close: bool,
):
"""
Args:
manager:
the manager that is responsible for this scope.
span:
the opentracing span which this scope represents the local
lifetime for.
logcontext:
the log context to which this scope is attached.
enter_logcontext:
if True the log context will be exited when the scope is finished
finish_on_close:
if True finish the span when the scope is closed
"""
super().__init__(manager, span)
self.logcontext = logcontext
self._finish_on_close = finish_on_close
self._enter_logcontext = enter_logcontext

def __str__(self) -> str:
return f"Scope<{self.span}>"

def close(self) -> None:
active_scope = self.manager.active
if active_scope is not self:
logger.error(
"Closing scope %s which is not the currently-active one %s",
self,
active_scope,
)

if self._finish_on_close:
self.span.finish()

self.logcontext.scope = None

if self._enter_logcontext:
self.logcontext.__exit__(None, None, None)
24 changes: 21 additions & 3 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@

from synapse.server import HomeServer

try:
import opentracing
except ImportError:
opentracing = None # type: ignore[assignment]


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -225,6 +230,7 @@ def run_as_background_process(
func: Callable[..., Awaitable[Optional[R]]],
*args: Any,
bg_start_span: bool = True,
test_only_tracer: Optional["opentracing.Tracer"] = None,
**kwargs: Any,
) -> "defer.Deferred[Optional[R]]":
"""Run the given function in its own logcontext, with resource metrics
Expand All @@ -250,6 +256,8 @@ def run_as_background_process(
bg_start_span: Whether to start an opentracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
test_only_tracer: Set the OpenTracing tracer to use. This is only useful for
tests.
args: positional args for func
kwargs: keyword args for func

Expand All @@ -259,6 +267,12 @@ def run_as_background_process(
rules.
"""

# Since we track the tracing scope in the `LoggingContext`, before we move to the
# sentinel logcontext (or a new `LoggingContext`), grab the currently active
# tracing span (if any) so that we can create a cross-link to the background process
# trace.
original_active_tracing_span = active_span(tracer=test_only_tracer)

async def run() -> Optional[R]:
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
Expand All @@ -276,8 +290,6 @@ async def run() -> Optional[R]:
) as logging_context:
try:
if bg_start_span:
original_active_tracing_span = active_span()

# If there is already an active span (e.g. because this background
# process was started as part of handling a request for example),
# because this is a long-running background task that may serve a
Expand Down Expand Up @@ -308,6 +320,7 @@ async def run() -> Optional[R]:
# Create a root span for the background process (disconnected
# from other spans)
ignore_active_span=True,
tracer=test_only_tracer,
)

# Also add a span in the original request trace that cross-links
Expand All @@ -324,8 +337,11 @@ async def run() -> Optional[R]:
f"start_bgproc.{desc}",
child_of=original_active_tracing_span,
ignore_active_span=True,
# Points to the background process span.
# Create the `FOLLOWS_FROM` reference to the background
# process span so there is a loose coupling between the two
# traces and it's easy to jump between.
contexts=[root_tracing_scope.span.context],
tracer=test_only_tracer,
):
pass

Expand All @@ -341,6 +357,7 @@ async def run() -> Optional[R]:
# span so there is a loose coupling between the two
# traces and it's easy to jump between.
contexts=[original_active_tracing_span.context],
tracer=test_only_tracer,
)

# For easy usage down below, we create a context manager that
Expand All @@ -359,6 +376,7 @@ def combined_context_manager() -> Generator[None, None, None]:
tracing_scope = start_active_span(
f"bgproc.{desc}",
tags={SynapseTags.REQUEST_ID: str(logging_context)},
tracer=test_only_tracer,
)
else:
tracing_scope = nullcontext()
Expand Down
Loading
Loading