From cf3269b6333bbbbaf246b924066c274448e2525a Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Mon, 8 Jul 2024 05:12:27 +0000 Subject: [PATCH 1/7] Replace list by GuardList for data locators --- servicex/dataset_group.py | 12 +++++--- servicex/servicex_client.py | 59 ++++++++++++++++++++++++++++++------ tests/test_databinder.py | 24 +++++++++++++++ tests/test_dataset_group.py | 20 ++++++++++++ tests/test_output_handler.py | 2 +- 5 files changed, 102 insertions(+), 15 deletions(-) diff --git a/servicex/dataset_group.py b/servicex/dataset_group.py index d7d94f2a..6aa8a690 100644 --- a/servicex/dataset_group.py +++ b/servicex/dataset_group.py @@ -66,25 +66,27 @@ async def as_signed_urls_async( self, display_progress: bool = True, provided_progress: Optional[Progress] = None, - ) -> List[TransformedResults]: + return_exceptions: bool = False, + ) -> List[Union[TransformedResults, BaseException]]: with ExpandableProgress(display_progress, provided_progress) as progress: self.tasks = [ d.as_signed_urls_async(provided_progress=progress) for d in self.datasets ] - return await asyncio.gather(*self.tasks) + return await asyncio.gather(*self.tasks, return_exceptions=return_exceptions) as_signed_urls = make_sync(as_signed_urls_async) async def as_files_async(self, display_progress: bool = True, - provided_progress: Optional[Progress] = None - ) -> List[TransformedResults]: + provided_progress: Optional[Progress] = None, + return_exceptions: bool = False, + ) -> List[Union[TransformedResults, BaseException]]: with ExpandableProgress(display_progress, provided_progress) as progress: self.tasks = [ d.as_files_async(provided_progress=progress) for d in self.datasets ] - return await asyncio.gather(*self.tasks) + return await asyncio.gather(*self.tasks, return_exceptions=return_exceptions) as_files = make_sync(as_files_async) diff --git a/servicex/servicex_client.py b/servicex/servicex_client.py index af61fc00..5e534b50 100644 --- a/servicex/servicex_client.py +++ b/servicex/servicex_client.py @@ -26,7 +26,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging -from typing import Optional, List, TypeVar, Any, Type, Mapping, Union +from typing import Optional, List, TypeVar, Any, Type, Mapping, Union, cast from pathlib import Path from servicex.configuration import Configuration @@ -46,11 +46,43 @@ from make_it_sync import make_sync from servicex.databinder_models import ServiceXSpec, General, Sample +from collections.abc import Sequence T = TypeVar("T") logger = logging.getLogger(__name__) +class GuardList(Sequence[T]): + def __init__(self, data: Union[Sequence[T], Exception]): + super().__init__() + self._data = data + + def valid(self) -> bool: + return not isinstance(self._data, Exception) + + def __getitem__(self, index) -> T: + if not self.valid(): + data = cast(Exception, self._data) + raise data + else: + data = cast(Sequence[T], self._data) + return data[index] + + def __len__(self): + if not self.valid(): + data = cast(Exception, self._data) + raise data + else: + data = cast(Sequence, self._data) + return len(data) + + def __repr__(self): + if self.valid(): + return repr(self._data) + else: + return f'Invalid GuardList: {repr(self._data)}' + + def _load_ServiceXSpec( config: Union[ServiceXSpec, Mapping[str, Any], str, Path] ) -> ServiceXSpec: @@ -156,11 +188,19 @@ def get_codegen(_sample: Sample, _general: General): return datasets -def _output_handler(config: ServiceXSpec, results: List[TransformedResults]): +def _output_handler(config: ServiceXSpec, requests: List[Query], + results: List[TransformedResults]): + matched_results = zip(requests, results) if config.General.Delivery == General.DeliveryEnum.SignedURLs: - out_dict = {obj.title: obj.signed_url_list for obj in results} + out_dict = {obj[0].title: GuardList(obj[1].signed_url_list + if not isinstance(obj[1], Exception) + else obj[1]) + for obj in matched_results} elif config.General.Delivery == General.DeliveryEnum.LocalCache: - out_dict = {obj.title: obj.file_list for obj in results} + out_dict = {obj[0].title: GuardList(obj[1].file_list + if not isinstance(obj[1], Exception) + else obj[1]) + for obj in matched_results} if config.General.OutputDirectory: import yaml as yl @@ -178,7 +218,8 @@ def _output_handler(config: ServiceXSpec, results: List[TransformedResults]): def deliver( config: Union[ServiceXSpec, Mapping[str, Any], str, Path], config_path: Optional[str] = None, - servicex_name: Optional[str] = None + servicex_name: Optional[str] = None, + return_exceptions: bool = True ): config = _load_ServiceXSpec(config) @@ -187,12 +228,12 @@ def deliver( group = DatasetGroup(datasets) if config.General.Delivery == General.DeliveryEnum.SignedURLs: - results = group.as_signed_urls() - return _output_handler(config, results) + results = group.as_signed_urls(return_exceptions=return_exceptions) + return _output_handler(config, datasets, results) elif config.General.Delivery == General.DeliveryEnum.LocalCache: - results = group.as_files() - return _output_handler(config, results) + results = group.as_files(return_exceptions=return_exceptions) + return _output_handler(config, datasets, results) class ServiceXClient: diff --git a/tests/test_databinder.py b/tests/test_databinder.py index 6f6c8637..e37ab716 100644 --- a/tests/test_databinder.py +++ b/tests/test_databinder.py @@ -3,6 +3,7 @@ from pydantic import ValidationError from servicex import ServiceXSpec, FileListDataset, RucioDatasetIdentifier, dataset +from servicex.query_core import ServiceXException def basic_spec(samples=None): @@ -180,6 +181,29 @@ def test_submit_mapping(transformed_result, codegen_list): deliver(spec, config_path='tests/example_config.yaml') +def test_submit_mapping_failure(transformed_result, codegen_list): + from servicex import deliver + spec = { + "Sample": [ + { + "Name": "sampleA", + "RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal", + "Query": "[{'treename': 'nominal'}]", + "Codegen": "uproot-raw" + } + ] + } + with patch('servicex.dataset_group.DatasetGroup.as_files', + return_value=[ServiceXException("dummy")]), \ + patch('servicex.servicex_client.ServiceXClient.get_code_generators', + return_value=codegen_list): + results = deliver(spec, config_path='tests/example_config.yaml') + assert len(results) == 1 + with pytest.raises(ServiceXException): # should expect an exception to be thrown on access + for _ in results['sampleA']: + pass + + def test_yaml(tmp_path): from servicex.servicex_client import _load_ServiceXSpec from servicex.dataset import FileList, Rucio, CERNOpenData diff --git a/tests/test_dataset_group.py b/tests/test_dataset_group.py index e184563f..7869ba44 100644 --- a/tests/test_dataset_group.py +++ b/tests/test_dataset_group.py @@ -31,6 +31,7 @@ from servicex import ResultFormat from servicex.dataset_group import DatasetGroup +from servicex.query_core import ServiceXException def test_set_result_format(mocker): @@ -57,3 +58,22 @@ async def test_as_signed_urls(mocker, transformed_result): assert len(results) == 2 assert results[0].request_id == "123-45-6789" assert results[1].request_id == "98-765-432" + + +@pytest.mark.asyncio +async def test_failure(mocker, transformed_result): + ds1 = mocker.Mock() + ds1.as_signed_urls_async = AsyncMock(return_value=transformed_result) + + ds2 = mocker.Mock() + ds2.as_signed_urls_async = AsyncMock(side_effect=ServiceXException("dummy")) + + group = DatasetGroup([ds1, ds2]) + with pytest.raises(ServiceXException): + await group.as_signed_urls_async() + + results = await group.as_signed_urls_async(return_exceptions=True) + + assert len(results) == 2 + assert results[0].request_id == "123-45-6789" + assert isinstance(results[1], ServiceXException) diff --git a/tests/test_output_handler.py b/tests/test_output_handler.py index c17a099e..eed864dd 100644 --- a/tests/test_output_handler.py +++ b/tests/test_output_handler.py @@ -17,5 +17,5 @@ def test_output_directory(tmp_path): } config = ServiceXSpec(**config) - _output_handler(config, []) + _output_handler(config, [], []) assert Path(tmp_path, "servicex_fileset.yaml").exists() From 465bff052b90ef1620b3008202dbfac84ae41e9d Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Mon, 8 Jul 2024 05:23:19 +0000 Subject: [PATCH 2/7] Remove generics for GuardList due to Python 3.8 --- servicex/servicex_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/servicex/servicex_client.py b/servicex/servicex_client.py index 5e534b50..df53bce4 100644 --- a/servicex/servicex_client.py +++ b/servicex/servicex_client.py @@ -52,23 +52,23 @@ logger = logging.getLogger(__name__) -class GuardList(Sequence[T]): - def __init__(self, data: Union[Sequence[T], Exception]): +class GuardList(Sequence): + def __init__(self, data: Union[Sequence, Exception]): super().__init__() self._data = data def valid(self) -> bool: return not isinstance(self._data, Exception) - def __getitem__(self, index) -> T: + def __getitem__(self, index) -> Any: if not self.valid(): data = cast(Exception, self._data) raise data else: - data = cast(Sequence[T], self._data) + data = cast(Sequence, self._data) return data[index] - def __len__(self): + def __len__(self) -> int: if not self.valid(): data = cast(Exception, self._data) raise data From f6ac148c7737c4b4cb91432896e4c354144481d9 Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Mon, 8 Jul 2024 05:24:18 +0000 Subject: [PATCH 3/7] Copy any input sequence --- servicex/servicex_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/servicex/servicex_client.py b/servicex/servicex_client.py index df53bce4..db3daaa9 100644 --- a/servicex/servicex_client.py +++ b/servicex/servicex_client.py @@ -54,8 +54,9 @@ class GuardList(Sequence): def __init__(self, data: Union[Sequence, Exception]): + import copy super().__init__() - self._data = data + self._data = copy.copy(data) def valid(self) -> bool: return not isinstance(self._data, Exception) From 18074273e7dc705b29d95d895cc2365bf1fe67ce Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Mon, 8 Jul 2024 05:47:43 +0000 Subject: [PATCH 4/7] Coverage improvement --- tests/conftest.py | 16 +++++++++++++ tests/test_databinder.py | 51 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 05b5a627..679238d3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -174,6 +174,22 @@ def transformed_result(dummy_parquet_file) -> TransformedResults: ) +@fixture +def transformed_result_signed_url() -> TransformedResults: + return TransformedResults( + hash="123-4455", + title="Test", + codegen="uproot", + request_id="123-45-6789", + submit_time=datetime.now(), + data_dir="/foo/bar", + file_list=[], + signed_url_list=['https://dummy.junk.io/1.parquet', 'https://dummy.junk.io/2.parquet'], + files=2, + result_format=ResultFormat.root, + ) + + @fixture def dummy_parquet_file(): data = {'column1': [1, 2, 3, 4], diff --git a/tests/test_databinder.py b/tests/test_databinder.py index e37ab716..d324ce7a 100644 --- a/tests/test_databinder.py +++ b/tests/test_databinder.py @@ -178,7 +178,32 @@ def test_submit_mapping(transformed_result, codegen_list): return_value=[transformed_result]), \ patch('servicex.servicex_client.ServiceXClient.get_code_generators', return_value=codegen_list): - deliver(spec, config_path='tests/example_config.yaml') + results = deliver(spec, config_path='tests/example_config.yaml') + assert list(results['sampleA']) == ['1.parquet'] + + +def test_submit_mapping_signed_urls(transformed_result_signed_url, codegen_list): + from servicex import deliver + spec = { + "General": { + "Delivery": "SignedURLs" + }, + "Sample": [ + { + "Name": "sampleA", + "RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal", + "Query": "[{'treename': 'nominal'}]", + "Codegen": "uproot-raw" + } + ] + } + with patch('servicex.dataset_group.DatasetGroup.as_signed_urls', + return_value=[transformed_result_signed_url]), \ + patch('servicex.servicex_client.ServiceXClient.get_code_generators', + return_value=codegen_list): + results = deliver(spec, config_path='tests/example_config.yaml') + assert list(results['sampleA']) == ['https://dummy.junk.io/1.parquet', + 'https://dummy.junk.io/2.parquet'] def test_submit_mapping_failure(transformed_result, codegen_list): @@ -204,6 +229,30 @@ def test_submit_mapping_failure(transformed_result, codegen_list): pass +def test_submit_mapping_failure_signed_urls(codegen_list): + from servicex import deliver + spec = { + "General": {"Delivery": "SignedURLs"}, + "Sample": [ + { + "Name": "sampleA", + "RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal", + "Query": "[{'treename': 'nominal'}]", + "Codegen": "uproot-raw" + } + ] + } + with patch('servicex.dataset_group.DatasetGroup.as_signed_urls', + return_value=[ServiceXException("dummy")]), \ + patch('servicex.servicex_client.ServiceXClient.get_code_generators', + return_value=codegen_list): + results = deliver(spec, config_path='tests/example_config.yaml', return_exceptions=False) + assert len(results) == 1 + with pytest.raises(ServiceXException): # should expect an exception to be thrown on access + for _ in results['sampleA']: + pass + + def test_yaml(tmp_path): from servicex.servicex_client import _load_ServiceXSpec from servicex.dataset import FileList, Rucio, CERNOpenData From 317115b3fd9d743ec336d067f5eb3563bba08029 Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Mon, 8 Jul 2024 05:59:24 +0000 Subject: [PATCH 5/7] More coverage --- tests/test_dataset_group.py | 17 +++++++++++++++++ tests/test_guardlist.py | 14 ++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 tests/test_guardlist.py diff --git a/tests/test_dataset_group.py b/tests/test_dataset_group.py index 7869ba44..24161efc 100644 --- a/tests/test_dataset_group.py +++ b/tests/test_dataset_group.py @@ -60,6 +60,23 @@ async def test_as_signed_urls(mocker, transformed_result): assert results[1].request_id == "98-765-432" +@pytest.mark.asyncio +async def test_as_files(mocker, transformed_result): + ds1 = mocker.Mock() + ds1.as_files_async = AsyncMock(return_value=transformed_result) + + ds2 = mocker.Mock() + ds2.as_files_async = AsyncMock(return_value=transformed_result.model_copy( + update={"request_id": "98-765-432"})) + + group = DatasetGroup([ds1, ds2]) + results = await group.as_files_async() + + assert len(results) == 2 + assert results[0].request_id == "123-45-6789" + assert results[1].request_id == "98-765-432" + + @pytest.mark.asyncio async def test_failure(mocker, transformed_result): ds1 = mocker.Mock() diff --git a/tests/test_guardlist.py b/tests/test_guardlist.py new file mode 100644 index 00000000..6d6745b7 --- /dev/null +++ b/tests/test_guardlist.py @@ -0,0 +1,14 @@ +from servicex.servicex_client import GuardList +import pytest + + +def test_guardlist(): + gl1 = GuardList([1]) + assert str(gl1) == '[1]' + assert gl1[0] == 1 + gl2 = GuardList(ValueError()) + assert str(gl2) == 'Invalid GuardList: ValueError()' + with pytest.raises(ValueError): + gl2[0] + with pytest.raises(ValueError): + len(gl2) From 921d9c97b415aa443f476f6886a8757124d0b8f9 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Tue, 23 Jul 2024 11:58:33 -0500 Subject: [PATCH 6/7] Fix flake8 --- servicex/dataset_group.py | 2 +- servicex/servicex_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/servicex/dataset_group.py b/servicex/dataset_group.py index 5a70e515..5dd79ece 100644 --- a/servicex/dataset_group.py +++ b/servicex/dataset_group.py @@ -26,7 +26,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import asyncio -from typing import List, Optional +from typing import List, Optional, Union from rich.progress import Progress from servicex.query_core import Query diff --git a/servicex/servicex_client.py b/servicex/servicex_client.py index 7e99bbbf..9b894fd4 100644 --- a/servicex/servicex_client.py +++ b/servicex/servicex_client.py @@ -26,7 +26,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging -from typing import Optional, List, TypeVar, Any, Type, Mapping, Union, cast +from typing import Optional, List, TypeVar, Any, Mapping, Union, cast from pathlib import Path from servicex.configuration import Configuration From 1a4538f6f4b3293675cc8b4defd48caefd411cab Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Tue, 23 Jul 2024 15:33:29 -0500 Subject: [PATCH 7/7] Add Exception --- servicex/servicex_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/servicex/servicex_client.py b/servicex/servicex_client.py index 9b894fd4..496912d8 100644 --- a/servicex/servicex_client.py +++ b/servicex/servicex_client.py @@ -151,7 +151,7 @@ def get_codegen(_sample: Sample, _general: General): def _output_handler(config: ServiceXSpec, requests: List[Query], - results: List[TransformedResults]): + results: List[Union[TransformedResults, Exception]]): matched_results = zip(requests, results) if config.General.Delivery == General.DeliveryEnum.SignedURLs: out_dict = {obj[0].title: GuardList(obj[1].signed_url_list @@ -195,6 +195,7 @@ def deliver( elif config.General.Delivery == General.DeliveryEnum.LocalCache: results = group.as_files(return_exceptions=return_exceptions) + print(results) return _output_handler(config, datasets, results)