From 856a7c440de4215238e5df8022df5d185c1014dd Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Tue, 16 Sep 2025 04:03:20 +0000 Subject: [PATCH 1/4] Add fsspec gen ai upload hook --- docs-requirements.txt | 3 + docs/conf.py | 1 + docs/instrumentation-genai/util.rst | 4 + tox.ini | 2 +- util/opentelemetry-util-genai/pyproject.toml | 8 +- .../util/genai/_fsspec_upload.py | 222 ++++++++++++++++++ .../util/genai/environment_variables.py | 23 ++ .../test-requirements.txt | 3 +- .../tests/test_fsspec_upload.py | 219 +++++++++++++++++ 9 files changed, 480 insertions(+), 5 deletions(-) create mode 100644 util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py create mode 100644 util/opentelemetry-util-genai/tests/test_fsspec_upload.py diff --git a/docs-requirements.txt b/docs-requirements.txt index cc246fe221..afd03672d0 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -9,6 +9,9 @@ pymemcache~=1.3 # Required by conf django>=2.2 +# Require by opentelemetry-util-genai +fsspec>=2025.9.0 + # Required by instrumentation and exporter packages aio_pika~=7.2.0 aiohttp~=3.0 diff --git a/docs/conf.py b/docs/conf.py index 47ad43def2..0f45647dc4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -130,6 +130,7 @@ None, ), "redis": ("https://redis.readthedocs.io/en/latest/", None), + "fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None), } # http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky diff --git a/docs/instrumentation-genai/util.rst b/docs/instrumentation-genai/util.rst index d55f2d1bf2..2ea0852e3c 100644 --- a/docs/instrumentation-genai/util.rst +++ b/docs/instrumentation-genai/util.rst @@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util :members: :undoc-members: :show-inheritance: + +.. automodule:: opentelemetry.util.genai._fsspec_upload + :members: + :show-inheritance: diff --git a/tox.ini b/tox.ini index 379735d408..e2d2e3a6c2 100644 --- a/tox.ini +++ b/tox.ini @@ -1060,7 +1060,7 @@ deps = {[testenv]test_deps} {toxinidir}/opentelemetry-instrumentation {toxinidir}/util/opentelemetry-util-http - {toxinidir}/util/opentelemetry-util-genai + {toxinidir}/util/opentelemetry-util-genai[fsspec] {toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments] {toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments] {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments] diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index 280da37d58..9e371c1a1d 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -30,10 +30,12 @@ dependencies = [ "opentelemetry-api>=1.31.0", ] +[project.entry-points.opentelemetry_genai_upload_hook] +fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook" + [project.optional-dependencies] -test = [ - "pytest>=7.0.0", -] +test = ["pytest>=7.0.0"] +fsspec = ["fsspec>=2025.9.0"] [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py new file mode 100644 index 0000000000..60ea51e2c6 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py @@ -0,0 +1,222 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +import logging +import posixpath +import threading +from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import asdict, dataclass +from functools import partial +from os import environ +from typing import Any, Callable, Literal, TextIO, cast +from uuid import uuid4 + +from opentelemetry._logs import LogRecord +from opentelemetry.trace import Span +from opentelemetry.util.genai import types +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH, +) +from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook + +# If fsspec is not installed the hook will be a no-op. +try: + import fsspec +except ImportError: + fsspec = None + +_logger = logging.getLogger(__name__) + + +@dataclass +class Completion: + inputs: list[types.InputMessage] + outputs: list[types.OutputMessage] + system_instruction: list[types.MessagePart] + + +@dataclass +class CompletionRefs: + inputs_ref: str + outputs_ref: str + system_instruction_ref: str + + +JsonEncodeable = list[dict[str, Any]] + +# mapping of upload path to function computing upload data dict +UploadData = dict[str, Callable[[], JsonEncodeable]] + +if fsspec is not None: + # save a copy for the type checker + fsspec_copy = fsspec + + def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO: + """typed wrapper around `fsspec.open`""" + return cast(TextIO, fsspec_copy.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType] + + class FsspecUploader: + """Implements uploading GenAI completions to a generic backend using fsspec + + This class is used by the `BatchUploadHook` to upload completions to an external + storage. + """ + + def upload( # pylint: disable=no-self-use + self, + path: str, + json_encodeable: Callable[[], JsonEncodeable], + ) -> None: + with fsspec_open(path, "w") as file: + json.dump(json_encodeable(), file, separators=(",", ":")) + + class FsspecUploadHook(UploadHook): + """An upload hook using ``fsspec`` to upload to external storage + + This function can be used as the + :func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by + setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``. + :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the + base path for uploads. + + Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op + implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]`` + as a requirement to achieve this. + """ + + def __init__( + self, + *, + uploader: FsspecUploader, + base_path: str, + max_size: int = 20, + ) -> None: + self._base_path = base_path + self._uploader = uploader + self._max_size = max_size + + # Use a ThreadPoolExecutor for its queueing and thread management. The semaphore + # limits the number of queued tasks. If the queue is full, data will be dropped. + self._executor = ThreadPoolExecutor(max_workers=max_size) + self._semaphore = threading.BoundedSemaphore(max_size) + + def _submit_all(self, upload_data: UploadData) -> None: + def done(future: Future[None]) -> None: + self._semaphore.release() + + try: + future.result() + except Exception: # pylint: disable=broad-except + _logger.exception("fsspec uploader failed") + + for path, json_encodeable in upload_data.items(): + # could not acquire, drop data + if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with + _logger.warning( + "fsspec upload queue is full, dropping upload %s", + path, + ) + return + + try: + fut = self._executor.submit( + self._uploader.upload, path, json_encodeable + ) + fut.add_done_callback(done) + except RuntimeError: + _logger.info( + "attempting to upload file after FsspecUploadHook.shutdown() was already called" + ) + break + + def calculate_ref_path(self, completion: Completion) -> CompletionRefs: + """Generate a path to the reference + + The default implementation uses :func:`~uuid.uuid4` to generate a random name per completion. + """ + # TODO: experimental with using the trace_id and span_id, or fetching + # gen_ai.response.id from the active span. + + uuid_str = str(uuid4()) + return CompletionRefs( + inputs_ref=posixpath.join( + self._base_path, f"{uuid_str}_inputs.json" + ), + outputs_ref=posixpath.join( + self._base_path, f"{uuid_str}_outputs.json" + ), + system_instruction_ref=posixpath.join( + self._base_path, f"{uuid_str}_system_instruction.json" + ), + ) + + def upload( + self, + *, + inputs: list[types.InputMessage], + outputs: list[types.OutputMessage], + system_instruction: list[types.MessagePart], + span: Span | None = None, + log_record: LogRecord | None = None, + **kwargs: Any, + ) -> None: + completion = Completion( + inputs=inputs, + outputs=outputs, + system_instruction=system_instruction, + ) + # generate the paths to upload to + ref_names = self.calculate_ref_path(completion) + + def to_dict( + dataclass_list: list[types.InputMessage] + | list[types.OutputMessage] + | list[types.MessagePart], + ) -> list[dict[str, Any]]: + return [asdict(dc) for dc in dataclass_list] + + self._submit_all( + { + ref_names.inputs_ref: partial(to_dict, completion.inputs), + ref_names.outputs_ref: partial( + to_dict, completion.outputs + ), + ref_names.system_instruction_ref: partial( + to_dict, completion.system_instruction + ), + }, + ) + + # TODO: stamp the refs on telemetry + + def shutdown(self) -> None: + # TODO: support timeout + self._executor.shutdown() + + def fsspec_upload_hook() -> UploadHook: + base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH) + if not base_path: + return _NoOpUploadHook() + + return FsspecUploadHook( + uploader=FsspecUploader(), + base_path=base_path, + ) +else: + + def fsspec_upload_hook() -> UploadHook: + return _NoOpUploadHook() diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py index 01a175b6c7..f876dd1845 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py @@ -21,4 +21,27 @@ ) """ .. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK + +The only known value is ``fsspec``, which +""" + +OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = ( + "OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH + +An :func:`fsspec.open` compatible URI/path for uploading prompts and responses. Can be a local +path like ``file:./prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more +information, see + +* `Instantiate a file-system + `_ for supported values and how to + install support for additional backend implementations. +* `Configuration + `_ for + configuring a backend with environment variables. +* `URL Chaining + `_ for advanced + use cases. """ diff --git a/util/opentelemetry-util-genai/test-requirements.txt b/util/opentelemetry-util-genai/test-requirements.txt index 91d59f42f5..34a1ad14a2 100644 --- a/util/opentelemetry-util-genai/test-requirements.txt +++ b/util/opentelemetry-util-genai/test-requirements.txt @@ -1,2 +1,3 @@ pytest==7.4.4 --e opentelemetry-instrumentation \ No newline at end of file +fsspec==2025.9.0 +-e opentelemetry-instrumentation diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py new file mode 100644 index 0000000000..bb9e1205d6 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -0,0 +1,219 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# pylint: disable=import-outside-toplevel,no-name-in-module + +import importlib +import logging +import sys +import threading +from dataclasses import asdict +from typing import Any +from unittest import TestCase +from unittest.mock import MagicMock, patch + +import fsspec +from fsspec.implementations.memory import MemoryFileSystem + +from opentelemetry.test.test_base import TestBase +from opentelemetry.util.genai import types +from opentelemetry.util.genai._fsspec_upload import ( + FsspecUploader, + FsspecUploadHook, +) +from opentelemetry.util.genai.upload_hook import ( + _NoOpUploadHook, + load_upload_hook, +) + +# Use MemoryFileSystem for testing +# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.memory.MemoryFileSystem +BASE_PATH = "memory://" + + +@patch.dict( + "os.environ", + { + "OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec", + "OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH": BASE_PATH, + }, + clear=True, +) +class TestFsspecEntryPoint(TestCase): + def test_fsspec_entry_point(self): + self.assertIsInstance(load_upload_hook(), FsspecUploadHook) + + def test_fsspec_entry_point_no_fsspec(self): + """Tests that the a no-op uploader is used when fsspec is not installed""" + + from opentelemetry.util.genai import _fsspec_upload + + # Simulate fsspec imports failing + with patch.dict(sys.modules, {"fsspec": None}): + importlib.reload(_fsspec_upload) + self.assertIsInstance(load_upload_hook(), _NoOpUploadHook) + + +MAXSIZE = 5 +FAKE_INPUTS = [ + types.InputMessage( + role="user", + parts=[types.Text(content="What is the capital of France?")], + ), +] +FAKE_OUTPUTS = [ + types.OutputMessage( + role="assistant", + parts=[types.Text(content="Paris")], + finish_reason="stop", + ), +] +FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")] + + +class TestFsspecUploadHook(TestCase): + def setUp(self): + self.mock_uploader = MagicMock(spec=FsspecUploader) + self.hook = FsspecUploadHook( + uploader=self.mock_uploader, + base_path=BASE_PATH, + max_size=MAXSIZE, + ) + + def tearDown(self) -> None: + self.hook.shutdown() + + def test_shutdown_no_items(self): + self.hook.shutdown() + + def test_upload_then_shutdown(self): + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + # all items should be consumed + self.hook.shutdown() + + self.assertEqual( + self.mock_uploader.upload.call_count, + 3, + "should have uploaded 3 files", + ) + + def test_upload_blocked(self): + unblock_upload = threading.Event() + + def blocked_upload(*args: Any) -> None: + unblock_upload.wait() + + self.mock_uploader.upload = MagicMock(wraps=blocked_upload) + + # fill the queue + for _ in range(MAXSIZE): + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + + self.assertLessEqual( + self.mock_uploader.upload.call_count, + MAXSIZE, + f"uploader should only be called {MAXSIZE=} times", + ) + + with self.assertLogs(level=logging.WARNING) as logs: + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + + self.assertIn( + "fsspec upload queue is full, dropping upload", logs.output[0] + ) + + unblock_upload.set() + + def test_failed_upload_logs(self): + def failing_upload(*args: Any) -> None: + raise RuntimeError("failed to upload") + + self.mock_uploader.upload = MagicMock(wraps=failing_upload) + + with self.assertLogs(level=logging.ERROR) as logs: + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + self.hook.shutdown() + + self.assertIn("fsspec uploader failed", logs.output[0]) + + def test_upload_after_shutdown_logs(self): + self.hook.shutdown() + with self.assertLogs(level=logging.INFO) as logs: + self.hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + self.assertEqual(len(logs.output), 1) + self.assertIn( + "attempting to upload file after FsspecUploadHook.shutdown() was already called", + logs.output[0], + ) + + +class FsspecUploaderTest(TestCase): + def test_upload(self): + uploader = FsspecUploader() + uploader.upload( + "memory://my_path", + lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS], + ) + + with fsspec.open("memory://my_path", "r") as file: + self.assertEqual( + file.read(), + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]', + ) + + +class TestFsspecUploadHookIntegration(TestBase): + def setUp(self): + MemoryFileSystem.store.clear() + + def assert_fsspec_equal(self, path: str, value: str) -> None: + with fsspec.open(path, "r") as file: + self.assertEqual(file.read(), value) + + def test_upload_completions(self): + hook = FsspecUploadHook( + uploader=FsspecUploader(), + base_path=BASE_PATH, + ) + hook.upload( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + ) + hook.shutdown() + + fs = fsspec.open(BASE_PATH).fs + self.assertEqual(len(fs.ls(BASE_PATH)), 3) + # TODO: test stamped telemetry From 816ffd34f3355769a6592b69c0c6afbca2229408 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Tue, 16 Sep 2025 20:39:54 +0000 Subject: [PATCH 2/4] split up into sub-package to make imports cleaner --- .../util/genai/_fsspec_upload.py | 222 ------------------ .../util/genai/_fsspec_upload/__init__.py | 43 ++++ .../util/genai/_fsspec_upload/fsspec_hook.py | 195 +++++++++++++++ .../tests/test_fsspec_upload.py | 7 +- 4 files changed, 243 insertions(+), 224 deletions(-) delete mode 100644 util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py create mode 100644 util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py create mode 100644 util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py deleted file mode 100644 index 60ea51e2c6..0000000000 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import json -import logging -import posixpath -import threading -from concurrent.futures import Future, ThreadPoolExecutor -from dataclasses import asdict, dataclass -from functools import partial -from os import environ -from typing import Any, Callable, Literal, TextIO, cast -from uuid import uuid4 - -from opentelemetry._logs import LogRecord -from opentelemetry.trace import Span -from opentelemetry.util.genai import types -from opentelemetry.util.genai.environment_variables import ( - OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH, -) -from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook - -# If fsspec is not installed the hook will be a no-op. -try: - import fsspec -except ImportError: - fsspec = None - -_logger = logging.getLogger(__name__) - - -@dataclass -class Completion: - inputs: list[types.InputMessage] - outputs: list[types.OutputMessage] - system_instruction: list[types.MessagePart] - - -@dataclass -class CompletionRefs: - inputs_ref: str - outputs_ref: str - system_instruction_ref: str - - -JsonEncodeable = list[dict[str, Any]] - -# mapping of upload path to function computing upload data dict -UploadData = dict[str, Callable[[], JsonEncodeable]] - -if fsspec is not None: - # save a copy for the type checker - fsspec_copy = fsspec - - def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO: - """typed wrapper around `fsspec.open`""" - return cast(TextIO, fsspec_copy.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType] - - class FsspecUploader: - """Implements uploading GenAI completions to a generic backend using fsspec - - This class is used by the `BatchUploadHook` to upload completions to an external - storage. - """ - - def upload( # pylint: disable=no-self-use - self, - path: str, - json_encodeable: Callable[[], JsonEncodeable], - ) -> None: - with fsspec_open(path, "w") as file: - json.dump(json_encodeable(), file, separators=(",", ":")) - - class FsspecUploadHook(UploadHook): - """An upload hook using ``fsspec`` to upload to external storage - - This function can be used as the - :func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by - setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``. - :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the - base path for uploads. - - Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op - implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]`` - as a requirement to achieve this. - """ - - def __init__( - self, - *, - uploader: FsspecUploader, - base_path: str, - max_size: int = 20, - ) -> None: - self._base_path = base_path - self._uploader = uploader - self._max_size = max_size - - # Use a ThreadPoolExecutor for its queueing and thread management. The semaphore - # limits the number of queued tasks. If the queue is full, data will be dropped. - self._executor = ThreadPoolExecutor(max_workers=max_size) - self._semaphore = threading.BoundedSemaphore(max_size) - - def _submit_all(self, upload_data: UploadData) -> None: - def done(future: Future[None]) -> None: - self._semaphore.release() - - try: - future.result() - except Exception: # pylint: disable=broad-except - _logger.exception("fsspec uploader failed") - - for path, json_encodeable in upload_data.items(): - # could not acquire, drop data - if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with - _logger.warning( - "fsspec upload queue is full, dropping upload %s", - path, - ) - return - - try: - fut = self._executor.submit( - self._uploader.upload, path, json_encodeable - ) - fut.add_done_callback(done) - except RuntimeError: - _logger.info( - "attempting to upload file after FsspecUploadHook.shutdown() was already called" - ) - break - - def calculate_ref_path(self, completion: Completion) -> CompletionRefs: - """Generate a path to the reference - - The default implementation uses :func:`~uuid.uuid4` to generate a random name per completion. - """ - # TODO: experimental with using the trace_id and span_id, or fetching - # gen_ai.response.id from the active span. - - uuid_str = str(uuid4()) - return CompletionRefs( - inputs_ref=posixpath.join( - self._base_path, f"{uuid_str}_inputs.json" - ), - outputs_ref=posixpath.join( - self._base_path, f"{uuid_str}_outputs.json" - ), - system_instruction_ref=posixpath.join( - self._base_path, f"{uuid_str}_system_instruction.json" - ), - ) - - def upload( - self, - *, - inputs: list[types.InputMessage], - outputs: list[types.OutputMessage], - system_instruction: list[types.MessagePart], - span: Span | None = None, - log_record: LogRecord | None = None, - **kwargs: Any, - ) -> None: - completion = Completion( - inputs=inputs, - outputs=outputs, - system_instruction=system_instruction, - ) - # generate the paths to upload to - ref_names = self.calculate_ref_path(completion) - - def to_dict( - dataclass_list: list[types.InputMessage] - | list[types.OutputMessage] - | list[types.MessagePart], - ) -> list[dict[str, Any]]: - return [asdict(dc) for dc in dataclass_list] - - self._submit_all( - { - ref_names.inputs_ref: partial(to_dict, completion.inputs), - ref_names.outputs_ref: partial( - to_dict, completion.outputs - ), - ref_names.system_instruction_ref: partial( - to_dict, completion.system_instruction - ), - }, - ) - - # TODO: stamp the refs on telemetry - - def shutdown(self) -> None: - # TODO: support timeout - self._executor.shutdown() - - def fsspec_upload_hook() -> UploadHook: - base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH) - if not base_path: - return _NoOpUploadHook() - - return FsspecUploadHook( - uploader=FsspecUploader(), - base_path=base_path, - ) -else: - - def fsspec_upload_hook() -> UploadHook: - return _NoOpUploadHook() diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py new file mode 100644 index 0000000000..9727a6082b --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py @@ -0,0 +1,43 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from os import environ + +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH, +) +from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook + + +def fsspec_upload_hook() -> UploadHook: + # If fsspec is not installed the hook will be a no-op. + try: + # pylint: disable=import-outside-toplevel + from opentelemetry.util.genai._fsspec_upload.fsspec_hook import ( + FsspecUploader, + FsspecUploadHook, + ) + except ImportError: + return _NoOpUploadHook() + + base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH) + if not base_path: + return _NoOpUploadHook() + + return FsspecUploadHook( + uploader=FsspecUploader(), + base_path=base_path, + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py new file mode 100644 index 0000000000..125ea05e8c --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -0,0 +1,195 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import json +import logging +import posixpath +import threading +from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import asdict, dataclass +from functools import partial +from typing import Any, Callable, Literal, TextIO, cast +from uuid import uuid4 + +import fsspec + +from opentelemetry._logs import LogRecord +from opentelemetry.trace import Span +from opentelemetry.util.genai import types +from opentelemetry.util.genai.upload_hook import UploadHook + +_logger = logging.getLogger(__name__) + + +@dataclass +class Completion: + inputs: list[types.InputMessage] + outputs: list[types.OutputMessage] + system_instruction: list[types.MessagePart] + + +@dataclass +class CompletionRefs: + inputs_ref: str + outputs_ref: str + system_instruction_ref: str + + +JsonEncodeable = list[dict[str, Any]] + +# mapping of upload path to function computing upload data dict +UploadData = dict[str, Callable[[], JsonEncodeable]] + + +def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO: + """typed wrapper around `fsspec.open`""" + return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType] + + +class FsspecUploader: + """Implements uploading GenAI completions to a generic backend using fsspec + + This class is used by the `BatchUploadHook` to upload completions to an external + storage. + """ + + def upload( # pylint: disable=no-self-use + self, + path: str, + json_encodeable: Callable[[], JsonEncodeable], + ) -> None: + with fsspec_open(path, "w") as file: + json.dump(json_encodeable(), file, separators=(",", ":")) + + +class FsspecUploadHook(UploadHook): + """An upload hook using ``fsspec`` to upload to external storage + + This function can be used as the + :func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by + setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``. + :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the + base path for uploads. + + Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op + implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]`` + as a requirement to achieve this. + """ + + def __init__( + self, + *, + uploader: FsspecUploader, + base_path: str, + max_size: int = 20, + ) -> None: + self._base_path = base_path + self._uploader = uploader + self._max_size = max_size + + # Use a ThreadPoolExecutor for its queueing and thread management. The semaphore + # limits the number of queued tasks. If the queue is full, data will be dropped. + self._executor = ThreadPoolExecutor(max_workers=max_size) + self._semaphore = threading.BoundedSemaphore(max_size) + + def _submit_all(self, upload_data: UploadData) -> None: + def done(future: Future[None]) -> None: + self._semaphore.release() + + try: + future.result() + except Exception: # pylint: disable=broad-except + _logger.exception("fsspec uploader failed") + + for path, json_encodeable in upload_data.items(): + # could not acquire, drop data + if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with + _logger.warning( + "fsspec upload queue is full, dropping upload %s", + path, + ) + return + + try: + fut = self._executor.submit( + self._uploader.upload, path, json_encodeable + ) + fut.add_done_callback(done) + except RuntimeError: + _logger.info( + "attempting to upload file after FsspecUploadHook.shutdown() was already called" + ) + break + + def _calculate_ref_path(self) -> CompletionRefs: + # TODO: experimental with using the trace_id and span_id, or fetching + # gen_ai.response.id from the active span. + + uuid_str = str(uuid4()) + return CompletionRefs( + inputs_ref=posixpath.join( + self._base_path, f"{uuid_str}_inputs.json" + ), + outputs_ref=posixpath.join( + self._base_path, f"{uuid_str}_outputs.json" + ), + system_instruction_ref=posixpath.join( + self._base_path, f"{uuid_str}_system_instruction.json" + ), + ) + + def upload( + self, + *, + inputs: list[types.InputMessage], + outputs: list[types.OutputMessage], + system_instruction: list[types.MessagePart], + span: Span | None = None, + log_record: LogRecord | None = None, + **kwargs: Any, + ) -> None: + completion = Completion( + inputs=inputs, + outputs=outputs, + system_instruction=system_instruction, + ) + # generate the paths to upload to + ref_names = self._calculate_ref_path() + + def to_dict( + dataclass_list: list[types.InputMessage] + | list[types.OutputMessage] + | list[types.MessagePart], + ) -> JsonEncodeable: + return [asdict(dc) for dc in dataclass_list] + + self._submit_all( + { + # Use partial to defer as much as possible to the background threads + ref_names.inputs_ref: partial(to_dict, completion.inputs), + ref_names.outputs_ref: partial(to_dict, completion.outputs), + ref_names.system_instruction_ref: partial( + to_dict, completion.system_instruction + ), + }, + ) + + # TODO: stamp the refs on telemetry + + def shutdown(self) -> None: + # TODO: support timeout + self._executor.shutdown() diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index bb9e1205d6..a4a03f6122 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -29,7 +29,7 @@ from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai import types -from opentelemetry.util.genai._fsspec_upload import ( +from opentelemetry.util.genai._fsspec_upload.fsspec_hook import ( FsspecUploader, FsspecUploadHook, ) @@ -61,7 +61,10 @@ def test_fsspec_entry_point_no_fsspec(self): from opentelemetry.util.genai import _fsspec_upload # Simulate fsspec imports failing - with patch.dict(sys.modules, {"fsspec": None}): + with patch.dict( + sys.modules, + {"opentelemetry.util.genai._fsspec_upload.fsspec_hook": None}, + ): importlib.reload(_fsspec_upload) self.assertIsInstance(load_upload_hook(), _NoOpUploadHook) From b7bbeaec6bc93b2150f4151b71ae92f8e1435e1a Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Tue, 16 Sep 2025 21:28:25 +0000 Subject: [PATCH 3/4] Get rid of FsspecUploader separate class --- .../util/genai/_fsspec_upload/__init__.py | 6 +---- .../util/genai/_fsspec_upload/fsspec_hook.py | 27 ++++++------------- .../tests/test_fsspec_upload.py | 20 +++++++------- 3 files changed, 19 insertions(+), 34 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py index 9727a6082b..210dba3dcd 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/__init__.py @@ -27,7 +27,6 @@ def fsspec_upload_hook() -> UploadHook: try: # pylint: disable=import-outside-toplevel from opentelemetry.util.genai._fsspec_upload.fsspec_hook import ( - FsspecUploader, FsspecUploadHook, ) except ImportError: @@ -37,7 +36,4 @@ def fsspec_upload_hook() -> UploadHook: if not base_path: return _NoOpUploadHook() - return FsspecUploadHook( - uploader=FsspecUploader(), - base_path=base_path, - ) + return FsspecUploadHook(base_path=base_path) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index 125ea05e8c..8af72c5909 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -60,22 +60,6 @@ def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO: return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType] -class FsspecUploader: - """Implements uploading GenAI completions to a generic backend using fsspec - - This class is used by the `BatchUploadHook` to upload completions to an external - storage. - """ - - def upload( # pylint: disable=no-self-use - self, - path: str, - json_encodeable: Callable[[], JsonEncodeable], - ) -> None: - with fsspec_open(path, "w") as file: - json.dump(json_encodeable(), file, separators=(",", ":")) - - class FsspecUploadHook(UploadHook): """An upload hook using ``fsspec`` to upload to external storage @@ -93,12 +77,10 @@ class FsspecUploadHook(UploadHook): def __init__( self, *, - uploader: FsspecUploader, base_path: str, max_size: int = 20, ) -> None: self._base_path = base_path - self._uploader = uploader self._max_size = max_size # Use a ThreadPoolExecutor for its queueing and thread management. The semaphore @@ -126,7 +108,7 @@ def done(future: Future[None]) -> None: try: fut = self._executor.submit( - self._uploader.upload, path, json_encodeable + self._do_upload, path, json_encodeable ) fut.add_done_callback(done) except RuntimeError: @@ -152,6 +134,13 @@ def _calculate_ref_path(self) -> CompletionRefs: ), ) + @staticmethod + def _do_upload( + path: str, json_encodeable: Callable[[], JsonEncodeable] + ) -> None: + with fsspec_open(path, "w") as file: + json.dump(json_encodeable(), file, separators=(",", ":")) + def upload( self, *, diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index a4a03f6122..5526221a38 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -30,7 +30,6 @@ from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai import types from opentelemetry.util.genai._fsspec_upload.fsspec_hook import ( - FsspecUploader, FsspecUploadHook, ) from opentelemetry.util.genai.upload_hook import ( @@ -88,15 +87,18 @@ def test_fsspec_entry_point_no_fsspec(self): class TestFsspecUploadHook(TestCase): def setUp(self): - self.mock_uploader = MagicMock(spec=FsspecUploader) + self._fsspec_patcher = patch( + "opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec" + ) + self.mock_fsspec = self._fsspec_patcher.start() self.hook = FsspecUploadHook( - uploader=self.mock_uploader, base_path=BASE_PATH, max_size=MAXSIZE, ) def tearDown(self) -> None: self.hook.shutdown() + self._fsspec_patcher.stop() def test_shutdown_no_items(self): self.hook.shutdown() @@ -111,7 +113,7 @@ def test_upload_then_shutdown(self): self.hook.shutdown() self.assertEqual( - self.mock_uploader.upload.call_count, + self.mock_fsspec.open.call_count, 3, "should have uploaded 3 files", ) @@ -122,7 +124,7 @@ def test_upload_blocked(self): def blocked_upload(*args: Any) -> None: unblock_upload.wait() - self.mock_uploader.upload = MagicMock(wraps=blocked_upload) + self.mock_fsspec.open = MagicMock(wraps=blocked_upload) # fill the queue for _ in range(MAXSIZE): @@ -133,7 +135,7 @@ def blocked_upload(*args: Any) -> None: ) self.assertLessEqual( - self.mock_uploader.upload.call_count, + self.mock_fsspec.open.call_count, MAXSIZE, f"uploader should only be called {MAXSIZE=} times", ) @@ -155,7 +157,7 @@ def test_failed_upload_logs(self): def failing_upload(*args: Any) -> None: raise RuntimeError("failed to upload") - self.mock_uploader.upload = MagicMock(wraps=failing_upload) + self.mock_fsspec.open = MagicMock(wraps=failing_upload) with self.assertLogs(level=logging.ERROR) as logs: self.hook.upload( @@ -184,8 +186,7 @@ def test_upload_after_shutdown_logs(self): class FsspecUploaderTest(TestCase): def test_upload(self): - uploader = FsspecUploader() - uploader.upload( + FsspecUploadHook._do_upload( "memory://my_path", lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS], ) @@ -207,7 +208,6 @@ def assert_fsspec_equal(self, path: str, value: str) -> None: def test_upload_completions(self): hook = FsspecUploadHook( - uploader=FsspecUploader(), base_path=BASE_PATH, ) hook.upload( From 2f21804542cd7202ccb153a949f813c5e3d23f62 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Wed, 17 Sep 2025 17:17:35 +0000 Subject: [PATCH 4/4] comments, clean up doc strings --- .../opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py | 2 +- .../src/opentelemetry/util/genai/environment_variables.py | 4 +--- util/opentelemetry-util-genai/tests/test_fsspec_upload.py | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py index 8af72c5909..9bfbc864f0 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/fsspec_hook.py @@ -104,7 +104,7 @@ def done(future: Future[None]) -> None: "fsspec upload queue is full, dropping upload %s", path, ) - return + continue try: fut = self._executor.submit( diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py index f876dd1845..69c4419ae3 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py @@ -21,8 +21,6 @@ ) """ .. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK - -The only known value is ``fsspec``, which """ OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = ( @@ -32,7 +30,7 @@ .. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH An :func:`fsspec.open` compatible URI/path for uploading prompts and responses. Can be a local -path like ``file:./prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more +path like ``/path/to/prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more information, see * `Instantiate a file-system diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index 5526221a38..de55e28263 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -121,10 +121,11 @@ def test_upload_then_shutdown(self): def test_upload_blocked(self): unblock_upload = threading.Event() - def blocked_upload(*args: Any) -> None: + def blocked_upload(*args: Any): unblock_upload.wait() + return MagicMock() - self.mock_fsspec.open = MagicMock(wraps=blocked_upload) + self.mock_fsspec.open.side_effect = blocked_upload # fill the queue for _ in range(MAXSIZE):