From f24c445e9f1cdd008511f6bd656ac8550e35485e Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Sun, 3 Aug 2025 21:47:46 -0500 Subject: [PATCH 1/3] Add models and routes for reporting dataset lookup errors --- servicex_app/servicex_app/models.py | 4 + .../resources/internal/fileset_error.py | 118 ++++++++++++++++++ servicex_app/servicex_app/routes.py | 7 ++ 3 files changed, 129 insertions(+) create mode 100644 servicex_app/servicex_app/resources/internal/fileset_error.py diff --git a/servicex_app/servicex_app/models.py b/servicex_app/servicex_app/models.py index 6e78c4827..ad2d79bb3 100644 --- a/servicex_app/servicex_app/models.py +++ b/servicex_app/servicex_app/models.py @@ -157,6 +157,7 @@ class TransformStatus(Enum): complete = ("Complete", True) fatal = ("Fatal", True) canceled = ("Canceled", True) + bad_dataset = ("Bad Dataset", True) def __init__(self, string_name, is_complete): self.string_name = string_name @@ -404,6 +405,9 @@ class DatasetStatus(str, Enum): created = "created" looking = "looking" complete = "complete" + does_not_exist = "does_not_exist" + bad_name = "bad_name" + internal_failure = "internal_failure" class Dataset(db.Model): diff --git a/servicex_app/servicex_app/resources/internal/fileset_error.py b/servicex_app/servicex_app/resources/internal/fileset_error.py new file mode 100644 index 000000000..ebe47b38c --- /dev/null +++ b/servicex_app/servicex_app/resources/internal/fileset_error.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from flask import request, current_app + +from servicex_app.models import ( + Dataset, + db, + TransformRequest, + TransformStatus, + DatasetStatus, +) +from servicex_app.resources.servicex_resource import ServiceXResource + +from datetime import datetime, timezone + + +class FilesetError(ServiceXResource): + @classmethod + def make_api(cls, lookup_result_processor, transformer_manager): + cls.lookup_result_processor = lookup_result_processor + cls.transformer_manager = transformer_manager + return cls + + def put(self, dataset_id): + summary = request.get_json() + dataset = Dataset.find_by_id(int(dataset_id)) + + if dataset is None: + current_app.logger.info( + "Dataset lookup error received for unknown dataset", + extra={ + "dataset_id": dataset_id, + "error-type": summary["error-type"], + "message": summary["message"], + }, + ) + return + + current_app.logger.info( + "Error in file lookup", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "message": summary["message"], + }, + ) + + dataset.lookup_status = DatasetStatus(summary["error-type"]) + dataset.stale = True # Repeat lookup if we try again + db.session.commit() + + # shut down related transformations. Nothing good can come of letting them + # continue to run + namespace = current_app.config["TRANSFORMER_NAMESPACE"] + for running_request in TransformRequest.lookup_running_by_dataset_id( + int(dataset_id) + ): + running_request.status = TransformStatus.bad_dataset + running_request.finish_time = datetime.now(tz=timezone.utc) + self.transformer_manager.shutdown_transformer_job( + running_request.request_id, namespace + ) + current_app.logger.info( + "Shutting down transformer because of dataset lookup problem", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "message": summary["message"], + "requestId": running_request.request_id, + }, + ) + + # Tell any other transform that was waiting for the lookup to complete + # not to expect to run + for pending_transform in TransformRequest.lookup_pending_on_dataset( + int(dataset_id) + ): + pending_transform.status = TransformStatus.bad_dataset + pending_transform.finish_time = datetime.now(tz=timezone.utc) + current_app.logger.info( + "Shutting down transformer because of dataset lookup problem", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "message": summary["message"], + "requestId": pending_transform.request_id, + }, + ) + + db.session.commit() diff --git a/servicex_app/servicex_app/routes.py b/servicex_app/servicex_app/routes.py index 8fbeb1f10..148946dbd 100644 --- a/servicex_app/servicex_app/routes.py +++ b/servicex_app/servicex_app/routes.py @@ -49,6 +49,7 @@ def add_routes( from servicex_app.resources.internal.add_file_to_dataset import AddFileToDataset from servicex_app.resources.internal.fileset_complete import FilesetComplete + from servicex_app.resources.internal.fileset_error import FilesetError from servicex_app.resources.internal.transform_status import ( TransformationStatusInternal, ) @@ -183,6 +184,12 @@ def add_routes( "/servicex/internal/transformation//complete", ) + FilesetError.make_api(lookup_result_processor, transformer_manager) + api.add_resource( + FilesetError, + "/servicex/internal/transformation//error", + ) + TransformerFileComplete.make_api(transformer_manager) api.add_resource( TransformerFileComplete, From f4cadc67e0832cbbd0c0686f69ac5d7ba1ed5892 Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Thu, 2 Oct 2025 21:50:25 -0500 Subject: [PATCH 2/3] Add lookup error tests, fix a couple of bugs --- .../resources/internal/fileset_error.py | 11 +- .../resources/internal/test_fileset_error.py | 142 ++++++++++++++++++ 2 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 servicex_app/servicex_app_test/resources/internal/test_fileset_error.py diff --git a/servicex_app/servicex_app/resources/internal/fileset_error.py b/servicex_app/servicex_app/resources/internal/fileset_error.py index ebe47b38c..23b16cda0 100644 --- a/servicex_app/servicex_app/resources/internal/fileset_error.py +++ b/servicex_app/servicex_app/resources/internal/fileset_error.py @@ -55,11 +55,12 @@ def put(self, dataset_id): "Dataset lookup error received for unknown dataset", extra={ "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], "error-type": summary["error-type"], - "message": summary["message"], + "_message": summary["message"], }, ) - return + return '', 422 current_app.logger.info( "Error in file lookup", @@ -67,7 +68,7 @@ def put(self, dataset_id): "dataset_id": dataset_id, "elapsed-time": summary["elapsed-time"], "error-type": summary["error-type"], - "message": summary["message"], + "_message": summary["message"], }, ) @@ -92,7 +93,7 @@ def put(self, dataset_id): "dataset_id": dataset_id, "elapsed-time": summary["elapsed-time"], "error-type": summary["error-type"], - "message": summary["message"], + "_message": summary["message"], "requestId": running_request.request_id, }, ) @@ -110,7 +111,7 @@ def put(self, dataset_id): "dataset_id": dataset_id, "elapsed-time": summary["elapsed-time"], "error-type": summary["error-type"], - "message": summary["message"], + "_message": summary["message"], "requestId": pending_transform.request_id, }, ) diff --git a/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py b/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py new file mode 100644 index 000000000..c820c1b14 --- /dev/null +++ b/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py @@ -0,0 +1,142 @@ +# Copyright (c) 2019, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from datetime import timezone, datetime + +from servicex_app import LookupResultProcessor, TransformerManager +from servicex_app_test.resource_test_base import ResourceTestBase + +from servicex_app.models import ( + DatasetStatus, + Dataset, + TransformRequest, + TransformStatus, +) +from pytest import fixture, mark + + +class TestFilesetError(ResourceTestBase): + @fixture + def mock_find_dataset_by_id(self, mocker): + dm = mocker.Mock() + dm.dataset = Dataset( + name="rucio://my-did?files=1", + did_finder="rucio", + lookup_status=DatasetStatus.looking, + last_used=datetime.now(tz=timezone.utc), + last_updated=datetime.fromtimestamp(0), + ) + + dm.name = "rucio://my-did?files=1" + dm.id = 42 + + mock_find_by_id = mocker.patch.object(Dataset, "find_by_id", return_value=dm) + return mock_find_by_id + + @mark.parametrize("error", + ["does_not_exist", "bad_name", "internal_failure"]) + def test_put_fileset_error(self, mocker, mock_find_dataset_by_id, error): + dataset = mock_find_dataset_by_id.return_value + + pending_request = TransformRequest() + pending_request.status = TransformStatus.pending_lookup + mock_lookup_pending = mocker.patch.object( + TransformRequest, + "lookup_pending_on_dataset", + return_value=[pending_request], + ) + + lookup_request = TransformRequest() + lookup_request.status = TransformStatus.lookup + mock_lookup_running = mocker.patch.object( + TransformRequest, + "lookup_running_by_dataset_id", + return_value=[lookup_request], + ) + mock_processor = mocker.MagicMock(LookupResultProcessor) + mock_transformer_manager = mocker.MagicMock(TransformerManager) + mock_transformer_manager.shutdown_transformer_job = mocker.Mock() + + client = self._test_client(lookup_result_processor=mock_processor, + transformation_manager=mock_transformer_manager) + + response = client.put( + "/servicex/internal/transformation/1234/error", + json={ + "elapsed-time": 0, + "error-type": error, + "message": "honk" + }, + ) + assert response.status_code == 200 + mock_find_dataset_by_id.assert_called_once_with(1234) + assert dataset.lookup_status == DatasetStatus(error) + assert dataset.stale + + mock_lookup_pending.assert_called_once_with(1234) + mock_lookup_running.assert_called_once_with(1234) + assert pending_request.status == TransformStatus.bad_dataset + assert lookup_request.status == TransformStatus.bad_dataset + + def test_put_fileset_error_invalid_did(self, mocker): + pending_request = TransformRequest() + pending_request.status = TransformStatus.pending_lookup + mock_lookup_pending = mocker.patch.object( + TransformRequest, + "lookup_pending_on_dataset", + return_value=[pending_request], + ) + + lookup_request = TransformRequest() + lookup_request.status = TransformStatus.lookup + mock_lookup_running = mocker.patch.object( + TransformRequest, + "lookup_running_by_dataset_id", + return_value=[lookup_request], + ) + + mock_find_dataset_by_id = mocker.patch.object(Dataset, "find_by_id", return_value=None) + mock_processor = mocker.MagicMock(LookupResultProcessor) + mock_transformer_manager = mocker.MagicMock(TransformerManager) + mock_transformer_manager.shutdown_transformer_job = mocker.Mock() + + client = self._test_client(lookup_result_processor=mock_processor, + transformation_manager=mock_transformer_manager) + + response = client.put( + "/servicex/internal/transformation/1234/error", + json={ + "elapsed-time": 0, + "error-type": "bad_dataset", + "message": "honk" + }, + ) + assert response.status_code == 422 + mock_find_dataset_by_id.assert_called_once_with(1234) + + mock_lookup_pending.assert_not_called() + mock_lookup_running.assert_not_called() From 0539275a46decd6700bc8818f7dba7c2b7b3f24f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 3 Oct 2025 02:51:25 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../resources/internal/fileset_error.py | 2 +- .../resources/internal/test_fileset_error.py | 31 +++++++++---------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/servicex_app/servicex_app/resources/internal/fileset_error.py b/servicex_app/servicex_app/resources/internal/fileset_error.py index 23b16cda0..ee16a30c5 100644 --- a/servicex_app/servicex_app/resources/internal/fileset_error.py +++ b/servicex_app/servicex_app/resources/internal/fileset_error.py @@ -60,7 +60,7 @@ def put(self, dataset_id): "_message": summary["message"], }, ) - return '', 422 + return "", 422 current_app.logger.info( "Error in file lookup", diff --git a/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py b/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py index c820c1b14..679234eac 100644 --- a/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py +++ b/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py @@ -57,8 +57,7 @@ def mock_find_dataset_by_id(self, mocker): mock_find_by_id = mocker.patch.object(Dataset, "find_by_id", return_value=dm) return mock_find_by_id - @mark.parametrize("error", - ["does_not_exist", "bad_name", "internal_failure"]) + @mark.parametrize("error", ["does_not_exist", "bad_name", "internal_failure"]) def test_put_fileset_error(self, mocker, mock_find_dataset_by_id, error): dataset = mock_find_dataset_by_id.return_value @@ -81,16 +80,14 @@ def test_put_fileset_error(self, mocker, mock_find_dataset_by_id, error): mock_transformer_manager = mocker.MagicMock(TransformerManager) mock_transformer_manager.shutdown_transformer_job = mocker.Mock() - client = self._test_client(lookup_result_processor=mock_processor, - transformation_manager=mock_transformer_manager) + client = self._test_client( + lookup_result_processor=mock_processor, + transformation_manager=mock_transformer_manager, + ) response = client.put( "/servicex/internal/transformation/1234/error", - json={ - "elapsed-time": 0, - "error-type": error, - "message": "honk" - }, + json={"elapsed-time": 0, "error-type": error, "message": "honk"}, ) assert response.status_code == 200 mock_find_dataset_by_id.assert_called_once_with(1234) @@ -119,21 +116,21 @@ def test_put_fileset_error_invalid_did(self, mocker): return_value=[lookup_request], ) - mock_find_dataset_by_id = mocker.patch.object(Dataset, "find_by_id", return_value=None) + mock_find_dataset_by_id = mocker.patch.object( + Dataset, "find_by_id", return_value=None + ) mock_processor = mocker.MagicMock(LookupResultProcessor) mock_transformer_manager = mocker.MagicMock(TransformerManager) mock_transformer_manager.shutdown_transformer_job = mocker.Mock() - client = self._test_client(lookup_result_processor=mock_processor, - transformation_manager=mock_transformer_manager) + client = self._test_client( + lookup_result_processor=mock_processor, + transformation_manager=mock_transformer_manager, + ) response = client.put( "/servicex/internal/transformation/1234/error", - json={ - "elapsed-time": 0, - "error-type": "bad_dataset", - "message": "honk" - }, + json={"elapsed-time": 0, "error-type": "bad_dataset", "message": "honk"}, ) assert response.status_code == 422 mock_find_dataset_by_id.assert_called_once_with(1234)