Skip to content

Commit d81d1c3

Browse files
committed
refactor(ipc): more general shared file
We refactor the file queue into a more generic shared string file for IPC that can be used by more components.
1 parent c426d0f commit d81d1c3

File tree

3 files changed

+213
-108
lines changed

3 files changed

+213
-108
lines changed

ddtrace/internal/_file_queue.py

Lines changed: 0 additions & 105 deletions
This file was deleted.

ddtrace/internal/ipc.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
from contextlib import contextmanager
2+
import os
3+
import secrets
4+
import tempfile
5+
import typing
6+
7+
from ddtrace.internal._unpatched import unpatched_open
8+
from ddtrace.internal.compat import Path
9+
from ddtrace.internal.logger import get_logger
10+
11+
12+
log = get_logger(__name__)
13+
14+
15+
MAX_FILE_SIZE = 8192
16+
17+
18+
class BaseLock:
19+
def __init__(self, file: typing.IO[typing.Any]):
20+
self.file = file
21+
22+
def acquire(self):
23+
...
24+
25+
def release(self):
26+
...
27+
28+
def __enter__(self):
29+
self.acquire()
30+
return self
31+
32+
def __exit__(self, exc_type, exc_value, exc_tb):
33+
self.release()
34+
35+
36+
try:
37+
# Unix based file locking
38+
# Availability: Unix, not Emscripten, not WASI.
39+
import fcntl
40+
41+
class BaseUnixLock(BaseLock):
42+
__acquire_mode__ = None
43+
44+
def acquire(self):
45+
if self.__acquire_mode__ is None:
46+
msg = f"Cannot use lock of type {type(self)} directly"
47+
raise ValueError(msg)
48+
49+
fcntl.lockf(self.file, self.__acquire_mode__)
50+
51+
def release(self):
52+
fcntl.lockf(self.file, fcntl.LOCK_UN)
53+
54+
class ReadLock(BaseUnixLock):
55+
__acquire_mode__ = fcntl.LOCK_SH
56+
57+
class WriteLock(BaseUnixLock):
58+
__acquire_mode__ = fcntl.LOCK_EX
59+
60+
@contextmanager
61+
def open_file(path, mode):
62+
yield unpatched_open(path, mode)
63+
64+
except ModuleNotFoundError:
65+
# Availability: Windows
66+
import _winapi
67+
import ctypes
68+
from ctypes import wintypes
69+
import msvcrt
70+
import os
71+
72+
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
73+
74+
LOCKFILE_EXCLUSIVE_LOCK = 0x00000002
75+
INVALID_HANDLE_VALUE = -1
76+
77+
class OVERLAPPED(ctypes.Structure):
78+
_fields_ = [
79+
("Internal", wintypes.LPVOID),
80+
("InternalHigh", wintypes.LPVOID),
81+
("Offset", wintypes.DWORD),
82+
("OffsetHigh", wintypes.DWORD),
83+
("hEvent", wintypes.HANDLE),
84+
]
85+
86+
kernel32.LockFileEx.argtypes = (
87+
wintypes.HANDLE,
88+
wintypes.DWORD,
89+
wintypes.DWORD,
90+
wintypes.DWORD,
91+
wintypes.DWORD,
92+
ctypes.POINTER(OVERLAPPED),
93+
)
94+
kernel32.LockFileEx.restype = wintypes.BOOL
95+
96+
kernel32.UnlockFileEx.argtypes = (
97+
wintypes.HANDLE,
98+
wintypes.DWORD,
99+
wintypes.DWORD,
100+
wintypes.DWORD,
101+
ctypes.POINTER(OVERLAPPED),
102+
)
103+
kernel32.UnlockFileEx.restype = wintypes.BOOL
104+
105+
SHARED_READ_WRITE = 0x7
106+
OPEN_ALWAYS = 4
107+
RANDOM_ACCESS = 0x10000000
108+
109+
class BaseWinLock(BaseLock):
110+
__acquire_mode__ = None
111+
112+
def __init__(self, file: typing.IO[typing.Any]):
113+
super().__init__(file)
114+
self.handle = None
115+
self.overlapped: typing.Optional[OVERLAPPED] = None
116+
117+
def acquire(self):
118+
self.handle = _winapi.CreateFile(
119+
self.file.name,
120+
_winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
121+
SHARED_READ_WRITE,
122+
0,
123+
OPEN_ALWAYS,
124+
RANDOM_ACCESS,
125+
0,
126+
)
127+
128+
self.overlapped = OVERLAPPED()
129+
130+
kernel32.LockFileEx(self.handle, self.__acquire_mode__, 0, 0, 0, ctypes.byref(self.overlapped))
131+
132+
def release(self):
133+
if self.handle is None or self.overlapped is None:
134+
msg = "Lock was not acquired"
135+
raise RuntimeError(msg)
136+
137+
try:
138+
kernel32.UnlockFileEx(self.handle, 0, 0, 0, ctypes.byref(self.overlapped))
139+
finally:
140+
_winapi.CloseHandle(self.handle)
141+
142+
class ReadLock(BaseWinLock):
143+
__acquire_mode__ = 0 # Shared by default
144+
145+
class WriteLock(BaseWinLock):
146+
__acquire_mode__ = LOCKFILE_EXCLUSIVE_LOCK
147+
148+
@contextmanager
149+
def open_file(path, mode):
150+
# force all modes to be read/write binary
151+
mode = "r+b"
152+
flag = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
153+
fd_flag = os.O_RDWR | os.O_CREAT | os.O_BINARY | os.O_RANDOM
154+
155+
handle = _winapi.CreateFile(path, flag, SHARED_READ_WRITE, 0, OPEN_ALWAYS, RANDOM_ACCESS, 0)
156+
fd = msvcrt.open_osfhandle(handle, fd_flag | os.O_NOINHERIT)
157+
158+
yield unpatched_open(fd, mode)
159+
160+
_winapi.CloseHandle(handle)
161+
162+
163+
TMPDIR = Path(tempfile.gettempdir())
164+
165+
166+
class SharedStringFile:
167+
"""A simple shared-file implementation for multiprocess communication."""
168+
169+
def __init__(self) -> None:
170+
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8))
171+
172+
def put(self, data: str) -> None:
173+
"""Put a string into the file."""
174+
if self.filename is None:
175+
return
176+
177+
try:
178+
with open_file(self.filename, "ab") as f, WriteLock(f):
179+
f.seek(0, os.SEEK_END)
180+
dt = (data + "\x00").encode()
181+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
182+
f.write(dt)
183+
except Exception: # nosec
184+
pass
185+
186+
def peekall(self) -> typing.List[str]:
187+
"""Peek at all strings from the file."""
188+
if self.filename is None:
189+
return []
190+
191+
try:
192+
with open_file(self.filename, "r+b") as f, ReadLock(f):
193+
return f.read().strip(b"\x00").decode().split("\x00")
194+
except Exception: # nosec
195+
return []
196+
197+
def snatchall(self) -> typing.List[str]:
198+
"""Retrieve and remove all strings from the file."""
199+
if self.filename is None:
200+
return []
201+
202+
try:
203+
with open_file(self.filename, "r+b") as f, WriteLock(f):
204+
try:
205+
return f.read().strip(b"\x00").decode().split("\x00")
206+
finally:
207+
f.seek(0)
208+
f.truncate()
209+
except Exception: # nosec
210+
return []

ddtrace/settings/_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,9 +510,9 @@ def __init__(self):
510510
self._extra_services_queue = None
511511
if self._remote_config_enabled and not in_aws_lambda():
512512
# lazy load slow import
513-
from ddtrace.internal._file_queue import File_Queue
513+
from ddtrace.internal.ipc import SharedStringFile
514514

515-
self._extra_services_queue = File_Queue()
515+
self._extra_services_queue = SharedStringFile()
516516

517517
self._unparsed_service_mapping = _get_config("DD_SERVICE_MAPPING", "")
518518
self.service_mapping = parse_tags_str(self._unparsed_service_mapping)
@@ -703,7 +703,7 @@ def _get_extra_services(self):
703703
# type: () -> set[str]
704704
if self._extra_services_queue is None:
705705
return set()
706-
self._extra_services.update(self._extra_services_queue.get_all())
706+
self._extra_services.update(set(self._extra_services_queue.snatchall()))
707707
while len(self._extra_services) > 64:
708708
self._extra_services.pop()
709709
return self._extra_services

0 commit comments

Comments
 (0)