Skip to content

Commit 9853def

Browse files
authored
Merge pull request #413 from ssl-hep/3.0-guard-list
Use "guard list" for `deliver()`
2 parents f4b9e55 + 1a4538f commit 9853def

File tree

7 files changed

+202
-17
lines changed

7 files changed

+202
-17
lines changed

servicex/dataset_group.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828
import asyncio
29-
from typing import List, Optional
29+
from typing import List, Optional, Union
3030
from rich.progress import Progress
3131

3232
from servicex.query_core import Query
@@ -64,25 +64,27 @@ async def as_signed_urls_async(
6464
self,
6565
display_progress: bool = True,
6666
provided_progress: Optional[Progress] = None,
67-
) -> List[TransformedResults]:
67+
return_exceptions: bool = False,
68+
) -> List[Union[TransformedResults, BaseException]]:
6869
with ExpandableProgress(display_progress, provided_progress) as progress:
6970
self.tasks = [
7071
d.as_signed_urls_async(provided_progress=progress)
7172
for d in self.datasets
7273
]
73-
return await asyncio.gather(*self.tasks)
74+
return await asyncio.gather(*self.tasks, return_exceptions=return_exceptions)
7475

7576
as_signed_urls = make_sync(as_signed_urls_async)
7677

7778
async def as_files_async(self,
7879
display_progress: bool = True,
79-
provided_progress: Optional[Progress] = None
80-
) -> List[TransformedResults]:
80+
provided_progress: Optional[Progress] = None,
81+
return_exceptions: bool = False,
82+
) -> List[Union[TransformedResults, BaseException]]:
8183
with ExpandableProgress(display_progress, provided_progress) as progress:
8284
self.tasks = [
8385
d.as_files_async(provided_progress=progress)
8486
for d in self.datasets
8587
]
86-
return await asyncio.gather(*self.tasks)
88+
return await asyncio.gather(*self.tasks, return_exceptions=return_exceptions)
8789

8890
as_files = make_sync(as_files_async)

servicex/servicex_client.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828
import logging
29-
from typing import Optional, List, TypeVar, Any, Mapping, Union
29+
from typing import Optional, List, TypeVar, Any, Mapping, Union, cast
3030
from pathlib import Path
3131

3232
from servicex.configuration import Configuration
@@ -43,11 +43,44 @@
4343

4444
from make_it_sync import make_sync
4545
from servicex.databinder_models import ServiceXSpec, General, Sample
46+
from collections.abc import Sequence
4647

4748
T = TypeVar("T")
4849
logger = logging.getLogger(__name__)
4950

5051

52+
class GuardList(Sequence):
53+
def __init__(self, data: Union[Sequence, Exception]):
54+
import copy
55+
super().__init__()
56+
self._data = copy.copy(data)
57+
58+
def valid(self) -> bool:
59+
return not isinstance(self._data, Exception)
60+
61+
def __getitem__(self, index) -> Any:
62+
if not self.valid():
63+
data = cast(Exception, self._data)
64+
raise data
65+
else:
66+
data = cast(Sequence, self._data)
67+
return data[index]
68+
69+
def __len__(self) -> int:
70+
if not self.valid():
71+
data = cast(Exception, self._data)
72+
raise data
73+
else:
74+
data = cast(Sequence, self._data)
75+
return len(data)
76+
77+
def __repr__(self):
78+
if self.valid():
79+
return repr(self._data)
80+
else:
81+
return f'Invalid GuardList: {repr(self._data)}'
82+
83+
5184
def _load_ServiceXSpec(
5285
config: Union[ServiceXSpec, Mapping[str, Any], str, Path]
5386
) -> ServiceXSpec:
@@ -117,11 +150,19 @@ def get_codegen(_sample: Sample, _general: General):
117150
return datasets
118151

119152

120-
def _output_handler(config: ServiceXSpec, results: List[TransformedResults]):
153+
def _output_handler(config: ServiceXSpec, requests: List[Query],
154+
results: List[Union[TransformedResults, Exception]]):
155+
matched_results = zip(requests, results)
121156
if config.General.Delivery == General.DeliveryEnum.SignedURLs:
122-
out_dict = {obj.title: obj.signed_url_list for obj in results}
157+
out_dict = {obj[0].title: GuardList(obj[1].signed_url_list
158+
if not isinstance(obj[1], Exception)
159+
else obj[1])
160+
for obj in matched_results}
123161
elif config.General.Delivery == General.DeliveryEnum.LocalCache:
124-
out_dict = {obj.title: obj.file_list for obj in results}
162+
out_dict = {obj[0].title: GuardList(obj[1].file_list
163+
if not isinstance(obj[1], Exception)
164+
else obj[1])
165+
for obj in matched_results}
125166

126167
if config.General.OutputDirectory:
127168
import yaml as yl
@@ -139,7 +180,8 @@ def _output_handler(config: ServiceXSpec, results: List[TransformedResults]):
139180
def deliver(
140181
config: Union[ServiceXSpec, Mapping[str, Any], str, Path],
141182
config_path: Optional[str] = None,
142-
servicex_name: Optional[str] = None
183+
servicex_name: Optional[str] = None,
184+
return_exceptions: bool = True
143185
):
144186
config = _load_ServiceXSpec(config)
145187

@@ -148,12 +190,13 @@ def deliver(
148190
group = DatasetGroup(datasets)
149191

150192
if config.General.Delivery == General.DeliveryEnum.SignedURLs:
151-
results = group.as_signed_urls()
152-
return _output_handler(config, results)
193+
results = group.as_signed_urls(return_exceptions=return_exceptions)
194+
return _output_handler(config, datasets, results)
153195

154196
elif config.General.Delivery == General.DeliveryEnum.LocalCache:
155-
results = group.as_files()
156-
return _output_handler(config, results)
197+
results = group.as_files(return_exceptions=return_exceptions)
198+
print(results)
199+
return _output_handler(config, datasets, results)
157200

158201

159202
class ServiceXClient:

tests/conftest.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,22 @@ def transformed_result(dummy_parquet_file) -> TransformedResults:
178178
)
179179

180180

181+
@fixture
182+
def transformed_result_signed_url() -> TransformedResults:
183+
return TransformedResults(
184+
hash="123-4455",
185+
title="Test",
186+
codegen="uproot",
187+
request_id="123-45-6789",
188+
submit_time=datetime.now(),
189+
data_dir="/foo/bar",
190+
file_list=[],
191+
signed_url_list=['https://dummy.junk.io/1.parquet', 'https://dummy.junk.io/2.parquet'],
192+
files=2,
193+
result_format=ResultFormat.root,
194+
)
195+
196+
181197
@fixture
182198
def dummy_parquet_file():
183199
data = {'column1': [1, 2, 3, 4],

tests/test_databinder.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pydantic import ValidationError
44

55
from servicex import ServiceXSpec, dataset
6+
from servicex.query_core import ServiceXException
67
from servicex.dataset import FileList, Rucio
78

89

@@ -178,7 +179,79 @@ def test_submit_mapping(transformed_result, codegen_list):
178179
return_value=[transformed_result]), \
179180
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
180181
return_value=codegen_list):
181-
deliver(spec, config_path='tests/example_config.yaml')
182+
results = deliver(spec, config_path='tests/example_config.yaml')
183+
assert list(results['sampleA']) == ['1.parquet']
184+
185+
186+
def test_submit_mapping_signed_urls(transformed_result_signed_url, codegen_list):
187+
from servicex import deliver
188+
spec = {
189+
"General": {
190+
"Delivery": "SignedURLs"
191+
},
192+
"Sample": [
193+
{
194+
"Name": "sampleA",
195+
"RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal",
196+
"Query": "[{'treename': 'nominal'}]",
197+
"Codegen": "uproot-raw"
198+
}
199+
]
200+
}
201+
with patch('servicex.dataset_group.DatasetGroup.as_signed_urls',
202+
return_value=[transformed_result_signed_url]), \
203+
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
204+
return_value=codegen_list):
205+
results = deliver(spec, config_path='tests/example_config.yaml')
206+
assert list(results['sampleA']) == ['https://dummy.junk.io/1.parquet',
207+
'https://dummy.junk.io/2.parquet']
208+
209+
210+
def test_submit_mapping_failure(transformed_result, codegen_list):
211+
from servicex import deliver
212+
spec = {
213+
"Sample": [
214+
{
215+
"Name": "sampleA",
216+
"RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal",
217+
"Query": "[{'treename': 'nominal'}]",
218+
"Codegen": "uproot-raw"
219+
}
220+
]
221+
}
222+
with patch('servicex.dataset_group.DatasetGroup.as_files',
223+
return_value=[ServiceXException("dummy")]), \
224+
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
225+
return_value=codegen_list):
226+
results = deliver(spec, config_path='tests/example_config.yaml')
227+
assert len(results) == 1
228+
with pytest.raises(ServiceXException): # should expect an exception to be thrown on access
229+
for _ in results['sampleA']:
230+
pass
231+
232+
233+
def test_submit_mapping_failure_signed_urls(codegen_list):
234+
from servicex import deliver
235+
spec = {
236+
"General": {"Delivery": "SignedURLs"},
237+
"Sample": [
238+
{
239+
"Name": "sampleA",
240+
"RucioDID": "user.ivukotic:user.ivukotic.single_top_tW__nominal",
241+
"Query": "[{'treename': 'nominal'}]",
242+
"Codegen": "uproot-raw"
243+
}
244+
]
245+
}
246+
with patch('servicex.dataset_group.DatasetGroup.as_signed_urls',
247+
return_value=[ServiceXException("dummy")]), \
248+
patch('servicex.servicex_client.ServiceXClient.get_code_generators',
249+
return_value=codegen_list):
250+
results = deliver(spec, config_path='tests/example_config.yaml', return_exceptions=False)
251+
assert len(results) == 1
252+
with pytest.raises(ServiceXException): # should expect an exception to be thrown on access
253+
for _ in results['sampleA']:
254+
pass
182255

183256

184257
def test_yaml(tmp_path):

tests/test_dataset_group.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
from servicex import ResultFormat
3333
from servicex.dataset_group import DatasetGroup
34+
from servicex.query_core import ServiceXException
3435

3536

3637
def test_set_result_format(mocker):
@@ -57,3 +58,39 @@ async def test_as_signed_urls(mocker, transformed_result):
5758
assert len(results) == 2
5859
assert results[0].request_id == "123-45-6789"
5960
assert results[1].request_id == "98-765-432"
61+
62+
63+
@pytest.mark.asyncio
64+
async def test_as_files(mocker, transformed_result):
65+
ds1 = mocker.Mock()
66+
ds1.as_files_async = AsyncMock(return_value=transformed_result)
67+
68+
ds2 = mocker.Mock()
69+
ds2.as_files_async = AsyncMock(return_value=transformed_result.model_copy(
70+
update={"request_id": "98-765-432"}))
71+
72+
group = DatasetGroup([ds1, ds2])
73+
results = await group.as_files_async()
74+
75+
assert len(results) == 2
76+
assert results[0].request_id == "123-45-6789"
77+
assert results[1].request_id == "98-765-432"
78+
79+
80+
@pytest.mark.asyncio
81+
async def test_failure(mocker, transformed_result):
82+
ds1 = mocker.Mock()
83+
ds1.as_signed_urls_async = AsyncMock(return_value=transformed_result)
84+
85+
ds2 = mocker.Mock()
86+
ds2.as_signed_urls_async = AsyncMock(side_effect=ServiceXException("dummy"))
87+
88+
group = DatasetGroup([ds1, ds2])
89+
with pytest.raises(ServiceXException):
90+
await group.as_signed_urls_async()
91+
92+
results = await group.as_signed_urls_async(return_exceptions=True)
93+
94+
assert len(results) == 2
95+
assert results[0].request_id == "123-45-6789"
96+
assert isinstance(results[1], ServiceXException)

tests/test_guardlist.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from servicex.servicex_client import GuardList
2+
import pytest
3+
4+
5+
def test_guardlist():
6+
gl1 = GuardList([1])
7+
assert str(gl1) == '[1]'
8+
assert gl1[0] == 1
9+
gl2 = GuardList(ValueError())
10+
assert str(gl2) == 'Invalid GuardList: ValueError()'
11+
with pytest.raises(ValueError):
12+
gl2[0]
13+
with pytest.raises(ValueError):
14+
len(gl2)

tests/test_output_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ def test_output_directory(tmp_path):
1717
}
1818
config = ServiceXSpec(**config)
1919

20-
_output_handler(config, [])
20+
_output_handler(config, [], [])
2121
assert Path(tmp_path, "servicex_fileset.yaml").exists()

0 commit comments

Comments
 (0)