Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions servicex/dataset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import asyncio
from typing import List, Optional
from typing import List, Optional, Union
from rich.progress import Progress

from servicex.query_core import Query
Expand Down Expand Up @@ -64,25 +64,27 @@ async def as_signed_urls_async(
self,
display_progress: bool = True,
provided_progress: Optional[Progress] = None,
) -> List[TransformedResults]:
return_exceptions: bool = False,
) -> List[Union[TransformedResults, BaseException]]:
with ExpandableProgress(display_progress, provided_progress) as progress:
self.tasks = [
d.as_signed_urls_async(provided_progress=progress)
for d in self.datasets
]
return await asyncio.gather(*self.tasks)
return await asyncio.gather(*self.tasks, return_exceptions=return_exceptions)

as_signed_urls = make_sync(as_signed_urls_async)

async def as_files_async(self,
display_progress: bool = True,
provided_progress: Optional[Progress] = None
) -> List[TransformedResults]:
provided_progress: Optional[Progress] = None,
return_exceptions: bool = False,
) -> List[Union[TransformedResults, BaseException]]:
with ExpandableProgress(display_progress, provided_progress) as progress:
self.tasks = [
d.as_files_async(provided_progress=progress)
for d in self.datasets
]
return await asyncio.gather(*self.tasks)
return await asyncio.gather(*self.tasks, return_exceptions=return_exceptions)

as_files = make_sync(as_files_async)
61 changes: 52 additions & 9 deletions servicex/servicex_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
from typing import Optional, List, TypeVar, Any, Mapping, Union
from typing import Optional, List, TypeVar, Any, Mapping, Union, cast
from pathlib import Path

from servicex.configuration import Configuration
Expand All @@ -43,11 +43,44 @@

from make_it_sync import make_sync
from servicex.databinder_models import ServiceXSpec, General, Sample
from collections.abc import Sequence

T = TypeVar("T")
logger = logging.getLogger(__name__)


class GuardList(Sequence):
def __init__(self, data: Union[Sequence, Exception]):
import copy
super().__init__()
self._data = copy.copy(data)

def valid(self) -> bool:
return not isinstance(self._data, Exception)

def __getitem__(self, index) -> Any:
if not self.valid():
data = cast(Exception, self._data)
raise data
else:
data = cast(Sequence, self._data)
return data[index]

def __len__(self) -> int:
if not self.valid():
data = cast(Exception, self._data)
raise data
else:
data = cast(Sequence, self._data)
return len(data)

def __repr__(self):
if self.valid():
return repr(self._data)
else:
return f'Invalid GuardList: {repr(self._data)}'


def _load_ServiceXSpec(
config: Union[ServiceXSpec, Mapping[str, Any], str, Path]
) -> ServiceXSpec:
Expand Down Expand Up @@ -117,11 +150,19 @@ def get_codegen(_sample: Sample, _general: General):
return datasets


def _output_handler(config: ServiceXSpec, results: List[TransformedResults]):
def _output_handler(config: ServiceXSpec, requests: List[Query],
results: List[Union[TransformedResults, Exception]]):
matched_results = zip(requests, results)
if config.General.Delivery == General.DeliveryEnum.SignedURLs:
out_dict = {obj.title: obj.signed_url_list for obj in results}
out_dict = {obj[0].title: GuardList(obj[1].signed_url_list
if not isinstance(obj[1], Exception)
else obj[1])
for obj in matched_results}
elif config.General.Delivery == General.DeliveryEnum.LocalCache:
out_dict = {obj.title: obj.file_list for obj in results}
out_dict = {obj[0].title: GuardList(obj[1].file_list
if not isinstance(obj[1], Exception)
else obj[1])
for obj in matched_results}

if config.General.OutputDirectory:
import yaml as yl
Expand All @@ -139,7 +180,8 @@ def _output_handler(config: ServiceXSpec, results: List[TransformedResults]):
def deliver(
config: Union[ServiceXSpec, Mapping[str, Any], str, Path],
config_path: Optional[str] = None,
servicex_name: Optional[str] = None
servicex_name: Optional[str] = None,
return_exceptions: bool = True
):
config = _load_ServiceXSpec(config)

Expand All @@ -148,12 +190,13 @@ def deliver(
group = DatasetGroup(datasets)

if config.General.Delivery == General.DeliveryEnum.SignedURLs:
results = group.as_signed_urls()
return _output_handler(config, results)
results = group.as_signed_urls(return_exceptions=return_exceptions)
return _output_handler(config, datasets, results)

elif config.General.Delivery == General.DeliveryEnum.LocalCache:
results = group.as_files()
return _output_handler(config, results)
results = group.as_files(return_exceptions=return_exceptions)
print(results)
return _output_handler(config, datasets, results)


class ServiceXClient:
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,22 @@ def transformed_result(dummy_parquet_file) -> TransformedResults:
)


@fixture
def transformed_result_signed_url() -> TransformedResults:
return TransformedResults(
hash="123-4455",
title="Test",
codegen="uproot",
request_id="123-45-6789",
submit_time=datetime.now(),
data_dir="/foo/bar",
file_list=[],
signed_url_list=['https://dummy.junk.io/1.parquet', 'https://dummy.junk.io/2.parquet'],
files=2,
result_format=ResultFormat.root,
)


@fixture
def dummy_parquet_file():
data = {'column1': [1, 2, 3, 4],
Expand Down
75 changes: 74 additions & 1 deletion tests/test_databinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pydantic import ValidationError

from servicex import ServiceXSpec, dataset
from servicex.query_core import ServiceXException
from servicex.dataset import FileList, Rucio


Expand Down Expand Up @@ -178,7 +179,79 @@ def test_submit_mapping(transformed_result, codegen_list):
return_value=[transformed_result]), \
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
return_value=codegen_list):
deliver(spec, config_path='tests/example_config.yaml')
results = deliver(spec, config_path='tests/example_config.yaml')
assert list(results['sampleA']) == ['1.parquet']


def test_submit_mapping_signed_urls(transformed_result_signed_url, codegen_list):
from servicex import deliver
spec = {
"General": {
"Delivery": "SignedURLs"
},
"Sample": [
{
"Name": "sampleA",
"RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal",
"Query": "[{'treename': 'nominal'}]",
"Codegen": "uproot-raw"
}
]
}
with patch('servicex.dataset_group.DatasetGroup.as_signed_urls',
return_value=[transformed_result_signed_url]), \
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
return_value=codegen_list):
results = deliver(spec, config_path='tests/example_config.yaml')
assert list(results['sampleA']) == ['https://dummy.junk.io/1.parquet',
'https://dummy.junk.io/2.parquet']


def test_submit_mapping_failure(transformed_result, codegen_list):
from servicex import deliver
spec = {
"Sample": [
{
"Name": "sampleA",
"RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal",
"Query": "[{'treename': 'nominal'}]",
"Codegen": "uproot-raw"
}
]
}
with patch('servicex.dataset_group.DatasetGroup.as_files',
return_value=[ServiceXException("dummy")]), \
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
return_value=codegen_list):
results = deliver(spec, config_path='tests/example_config.yaml')
assert len(results) == 1
with pytest.raises(ServiceXException): # should expect an exception to be thrown on access
for _ in results['sampleA']:
pass


def test_submit_mapping_failure_signed_urls(codegen_list):
from servicex import deliver
spec = {
"General": {"Delivery": "SignedURLs"},
"Sample": [
{
"Name": "sampleA",
"RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal",
"Query": "[{'treename': 'nominal'}]",
"Codegen": "uproot-raw"
}
]
}
with patch('servicex.dataset_group.DatasetGroup.as_signed_urls',
return_value=[ServiceXException("dummy")]), \
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
return_value=codegen_list):
results = deliver(spec, config_path='tests/example_config.yaml', return_exceptions=False)
assert len(results) == 1
with pytest.raises(ServiceXException): # should expect an exception to be thrown on access
for _ in results['sampleA']:
pass


def test_yaml(tmp_path):
Expand Down
37 changes: 37 additions & 0 deletions tests/test_dataset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from servicex import ResultFormat
from servicex.dataset_group import DatasetGroup
from servicex.query_core import ServiceXException


def test_set_result_format(mocker):
Expand All @@ -57,3 +58,39 @@ async def test_as_signed_urls(mocker, transformed_result):
assert len(results) == 2
assert results[0].request_id == "123-45-6789"
assert results[1].request_id == "98-765-432"


@pytest.mark.asyncio
async def test_as_files(mocker, transformed_result):
ds1 = mocker.Mock()
ds1.as_files_async = AsyncMock(return_value=transformed_result)

ds2 = mocker.Mock()
ds2.as_files_async = AsyncMock(return_value=transformed_result.model_copy(
update={"request_id": "98-765-432"}))

group = DatasetGroup([ds1, ds2])
results = await group.as_files_async()

assert len(results) == 2
assert results[0].request_id == "123-45-6789"
assert results[1].request_id == "98-765-432"


@pytest.mark.asyncio
async def test_failure(mocker, transformed_result):
ds1 = mocker.Mock()
ds1.as_signed_urls_async = AsyncMock(return_value=transformed_result)

ds2 = mocker.Mock()
ds2.as_signed_urls_async = AsyncMock(side_effect=ServiceXException("dummy"))

group = DatasetGroup([ds1, ds2])
with pytest.raises(ServiceXException):
await group.as_signed_urls_async()

results = await group.as_signed_urls_async(return_exceptions=True)

assert len(results) == 2
assert results[0].request_id == "123-45-6789"
assert isinstance(results[1], ServiceXException)
14 changes: 14 additions & 0 deletions tests/test_guardlist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from servicex.servicex_client import GuardList
import pytest


def test_guardlist():
gl1 = GuardList([1])
assert str(gl1) == '[1]'
assert gl1[0] == 1
gl2 = GuardList(ValueError())
assert str(gl2) == 'Invalid GuardList: ValueError()'
with pytest.raises(ValueError):
gl2[0]
with pytest.raises(ValueError):
len(gl2)
2 changes: 1 addition & 1 deletion tests/test_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ def test_output_directory(tmp_path):
}
config = ServiceXSpec(**config)

_output_handler(config, [])
_output_handler(config, [], [])
assert Path(tmp_path, "servicex_fileset.yaml").exists()