From d62f8fcdaa6771708bd3f88aab2131dbcd37e59f Mon Sep 17 00:00:00 2001 From: "Gabriele N. Tornetta" Date: Wed, 29 Oct 2025 16:03:17 +0000 Subject: [PATCH] refactor(ipc): more general shared file We refactor the file queue into a more generic shared string file for IPC that can be used by more components. --- ddtrace/internal/_file_queue.py | 105 ------------- ddtrace/internal/ipc.py | 146 ++++++++++++++++++ ddtrace/settings/_config.py | 6 +- .../service_name/test_extra_services_names.py | 2 +- tests/profiling/suitespec.yml | 1 - tests/suitespec.yml | 2 +- 6 files changed, 151 insertions(+), 111 deletions(-) delete mode 100644 ddtrace/internal/_file_queue.py create mode 100644 ddtrace/internal/ipc.py diff --git a/ddtrace/internal/_file_queue.py b/ddtrace/internal/_file_queue.py deleted file mode 100644 index 0cb59861ff3..00000000000 --- a/ddtrace/internal/_file_queue.py +++ /dev/null @@ -1,105 +0,0 @@ -import os -import os.path -import secrets -import tempfile -import typing - -from ddtrace.internal._unpatched import unpatched_open -from ddtrace.internal.logger import get_logger - - -log = get_logger(__name__) - - -MAX_FILE_SIZE = 8192 - -try: - # Unix based file locking - # Availability: Unix, not Emscripten, not WASI. - import fcntl - - def lock(f): - fcntl.lockf(f, fcntl.LOCK_EX) - - def unlock(f): - fcntl.lockf(f, fcntl.LOCK_UN) - - def open_file(path, mode): - return unpatched_open(path, mode) - -except ModuleNotFoundError: - # Availability: Windows - import msvcrt - - def lock(f): - # You need to seek to the beginning of the file before locking it - f.seek(0) - msvcrt.locking(f.fileno(), msvcrt.LK_RLCK, MAX_FILE_SIZE) - - def unlock(f): - # You need to seek to the same position of the file when you locked before unlocking it - f.seek(0) - msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, MAX_FILE_SIZE) - - def open_file(path, mode): - import _winapi - - # force all modes to be read/write binary - mode = "r+b" - flag = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE - fd_flag = os.O_RDWR | os.O_CREAT | os.O_BINARY | os.O_RANDOM - SHARED_READ_WRITE = 0x7 - OPEN_ALWAYS = 4 - RANDOM_ACCESS = 0x10000000 - handle = _winapi.CreateFile(path, flag, SHARED_READ_WRITE, 0, OPEN_ALWAYS, RANDOM_ACCESS, 0) - fd = msvcrt.open_osfhandle(handle, fd_flag | os.O_NOINHERIT) - return unpatched_open(fd, mode) - - -class File_Queue: - """A simple file-based queue implementation for multiprocess communication.""" - - def __init__(self) -> None: - try: - self._directory: typing.Optional[str] = tempfile.gettempdir() - self.filename: typing.Optional[str] = os.path.join(self._directory, secrets.token_hex(8)) - except Exception as e: - info = f"Failed to create a temporary file for the file queue. {e}" - log.debug(info) - self._directory = None - self.filename = None - - def put(self, data: str) -> None: - """Push a string to the queue.""" - if self.filename is None: - return - try: - with open_file(self.filename, "ab") as f: - lock(f) - f.seek(0, os.SEEK_END) - dt = (data + "\x00").encode() - if f.tell() + len(dt) <= MAX_FILE_SIZE: - f.write(dt) - unlock(f) - except Exception: # nosec - pass - - def get_all(self) -> typing.Set[str]: - """Pop all unique strings from the queue.""" - if self.filename is None: - return set() - try: - with open_file(self.filename, "r+b") as f: - lock(f) - f.seek(0) - data = f.read().decode() - f.seek(0) - f.truncate() - unlock(f) - if data: - res = data.split("\x00") - res.pop() - return set(res) - except Exception: # nosec - pass - return set() diff --git a/ddtrace/internal/ipc.py b/ddtrace/internal/ipc.py new file mode 100644 index 00000000000..61bfa2f8e60 --- /dev/null +++ b/ddtrace/internal/ipc.py @@ -0,0 +1,146 @@ +from contextlib import contextmanager +import os +import secrets +import tempfile +import typing + +from ddtrace.internal._unpatched import unpatched_open +from ddtrace.internal.compat import Path +from ddtrace.internal.logger import get_logger + + +log = get_logger(__name__) + + +MAX_FILE_SIZE = 8192 + + +class BaseLock: + def __init__(self, file: typing.IO[typing.Any]): + self.file = file + + def acquire(self): + ... + + def release(self): + ... + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.release() + + +try: + # Unix based file locking + # Availability: Unix, not Emscripten, not WASI. + import fcntl + + class BaseUnixLock(BaseLock): + __acquire_mode__: typing.Optional[int] = None + + def acquire(self): + if self.__acquire_mode__ is None: + msg = f"Cannot use lock of type {type(self)} directly" + raise ValueError(msg) + + fcntl.lockf(self.file, self.__acquire_mode__) + + def release(self): + fcntl.lockf(self.file, fcntl.LOCK_UN) + + class ReadLock(BaseUnixLock): + __acquire_mode__ = fcntl.LOCK_SH + + class WriteLock(BaseUnixLock): + __acquire_mode__ = fcntl.LOCK_EX + + @contextmanager + def open_file(path, mode): + yield unpatched_open(path, mode) + +except ModuleNotFoundError: + # Availability: Windows + import msvcrt + + class BaseWinLock(BaseLock): + def acquire(self): + f = self.file + f.seek(0) + msvcrt.locking(f.fileno(), msvcrt.LK_RLCK, MAX_FILE_SIZE) + + def release(self): + f = self.file + f.seek(0) + msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, MAX_FILE_SIZE) + + ReadLock = WriteLock = BaseWinLock # type: ignore + + def open_file(path, mode): + import _winapi + + # force all modes to be read/write binary + mode = "r+b" + flag = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE + fd_flag = os.O_RDWR | os.O_CREAT | os.O_BINARY | os.O_RANDOM + SHARED_READ_WRITE = 0x7 + OPEN_ALWAYS = 4 + RANDOM_ACCESS = 0x10000000 + handle = _winapi.CreateFile(path, flag, SHARED_READ_WRITE, 0, OPEN_ALWAYS, RANDOM_ACCESS, 0) + fd = msvcrt.open_osfhandle(handle, fd_flag | os.O_NOINHERIT) + return unpatched_open(fd, mode) + + +TMPDIR = Path(tempfile.gettempdir()) + + +class SharedStringFile: + """A simple shared-file implementation for multiprocess communication.""" + + def __init__(self) -> None: + self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8)) + + def put(self, data: str) -> None: + """Put a string into the file.""" + if self.filename is None: + return + + try: + with open_file(self.filename, "ab") as f, WriteLock(f): + f.seek(0, os.SEEK_END) + dt = (data + "\x00").encode() + if f.tell() + len(dt) <= MAX_FILE_SIZE: + f.write(dt) + except Exception: # nosec + pass + + def peekall(self) -> typing.List[str]: + """Peek at all strings from the file.""" + if self.filename is None: + return [] + + try: + with open_file(self.filename, "r+b") as f, ReadLock(f): + f.seek(0) + return f.read().strip(b"\x00").decode().split("\x00") + except Exception: # nosec + return [] + + def snatchall(self) -> typing.List[str]: + """Retrieve and remove all strings from the file.""" + if self.filename is None: + return [] + + try: + with open_file(self.filename, "r+b") as f, WriteLock(f): + f.seek(0) + strings = f.read().strip(b"\x00").decode().split("\x00") + + f.seek(0) + f.truncate() + + return strings + except Exception: # nosec + return [] diff --git a/ddtrace/settings/_config.py b/ddtrace/settings/_config.py index 96c66bb5fce..75786e02796 100644 --- a/ddtrace/settings/_config.py +++ b/ddtrace/settings/_config.py @@ -510,9 +510,9 @@ def __init__(self): self._extra_services_queue = None if self._remote_config_enabled and not in_aws_lambda(): # lazy load slow import - from ddtrace.internal._file_queue import File_Queue + from ddtrace.internal.ipc import SharedStringFile - self._extra_services_queue = File_Queue() + self._extra_services_queue = SharedStringFile() self._unparsed_service_mapping = _get_config("DD_SERVICE_MAPPING", "") self.service_mapping = parse_tags_str(self._unparsed_service_mapping) @@ -703,7 +703,7 @@ def _get_extra_services(self): # type: () -> set[str] if self._extra_services_queue is None: return set() - self._extra_services.update(self._extra_services_queue.get_all()) + self._extra_services.update(set(self._extra_services_queue.snatchall()) - {""}) while len(self._extra_services) > 64: self._extra_services.pop() return self._extra_services diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index 4535f6fa4bc..5311b854c69 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -56,7 +56,7 @@ def test_config_extra_service_names_duplicates(run_python_code_in_subprocess): extra_services = ddtrace.config._get_extra_services() extra_services.discard("sqlite") # coverage -assert extra_services == {"extra_service_1"} +assert extra_services == {"extra_service_1"}, extra_services """ env = os.environ.copy() diff --git a/tests/profiling/suitespec.yml b/tests/profiling/suitespec.yml index 8c49696c48a..0e59edda6a5 100644 --- a/tests/profiling/suitespec.yml +++ b/tests/profiling/suitespec.yml @@ -34,7 +34,6 @@ components: core: - ddtrace/internal/__init__.py - ddtrace/internal/_exceptions.py - - ddtrace/internal/_file_queue.py - ddtrace/internal/_rand.pyi - ddtrace/internal/_rand.pyx - ddtrace/internal/_stdint.h diff --git a/tests/suitespec.yml b/tests/suitespec.yml index 45ff7bfd95b..76f60908b8f 100644 --- a/tests/suitespec.yml +++ b/tests/suitespec.yml @@ -37,7 +37,6 @@ components: core: - ddtrace/internal/__init__.py - ddtrace/internal/_exceptions.py - - ddtrace/internal/_file_queue.py - ddtrace/internal/_rand.pyi - ddtrace/internal/_rand.pyx - ddtrace/internal/_stdint.h @@ -54,6 +53,7 @@ components: - ddtrace/internal/forksafe.py - ddtrace/internal/gitmetadata.py - ddtrace/internal/glob_matching.py + - ddtrace/internal/ipc.py - ddtrace/internal/logger.py - ddtrace/_logger.py - ddtrace/internal/hostname.py