Skip to content

Commit 1827fc8

Browse files
authored
Merge branch 'master' into 474_non_json_errors
2 parents d23b88f + 84fc871 commit 1827fc8

12 files changed

+262
-8
lines changed

docs/command_line.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,14 @@ clear
7676
Clear all of the transforms from the cache. Add ``-y`` to force the
7777
operation without confirming with the console.
7878

79+
datasets
80+
~~~~~~~~
81+
82+
These commands interact with datasets cached on the server
83+
84+
list
85+
^^^^
86+
List all of the datasets cached on the server. Accepts a command line argument
87+
of ``--did-finder`` to filter the list of datasets by a specific DID finder such
88+
as ``rucio`` or ``user``.
89+

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
66

77
[project]
88
name = "servicex"
9-
version = "3.0.0-alpha.100"
9+
version = "3.0.1-alpha.1"
1010
description = "Python SDK and CLI Client for ServiceX"
1111
readme = "README.md"
1212
license = { text = "BSD-3-Clause" } # SPDX short identifier

servicex/app/datasets.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright (c) 2024, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
import asyncio
29+
from typing import Optional
30+
31+
import rich
32+
33+
from servicex.app.cli_options import url_cli_option, backend_cli_option
34+
35+
import typer
36+
37+
from servicex.servicex_client import ServiceXClient
38+
from rich.table import Table
39+
40+
datasets_app = typer.Typer(name="datasets", no_args_is_help=True)
41+
42+
43+
@datasets_app.command(no_args_is_help=True)
44+
def list(
45+
url: Optional[str] = url_cli_option,
46+
backend: Optional[str] = backend_cli_option,
47+
did_finder: Optional[str] = typer.Option(
48+
None,
49+
help="Filter datasets by DID finder. Some useful values are 'rucio' or 'user'",
50+
show_default=False,
51+
),
52+
):
53+
"""
54+
List the datasets.
55+
"""
56+
sx = ServiceXClient(url=url, backend=backend)
57+
table = Table(title="ServiceX Datasets")
58+
table.add_column("ID")
59+
table.add_column("Name")
60+
table.add_column("Files")
61+
table.add_column("Size")
62+
table.add_column("Status")
63+
table.add_column("Created")
64+
datasets = asyncio.run(sx.get_datasets(did_finder=did_finder))
65+
for d in datasets:
66+
# Format the CachedDataset object into a table row
67+
# The last_updated field is what we should be displaying, but that is
68+
# currently set to 1970-00-00 in the server and is never updated.
69+
# Stick with the last_used field until
70+
# https://github.com/ssl-hep/ServiceX/issues/906 is resolved
71+
table.add_row(
72+
str(d.id),
73+
d.name if d.did_finder != "user" else "File list",
74+
"%d" % d.n_files,
75+
"{:,}MB".format(round(d.size / 1e6)),
76+
d.lookup_status,
77+
d.last_used.strftime('%Y-%m-%dT%H:%M:%S'),
78+
)
79+
rich.print(table)

servicex/app/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import typer
3232

3333
from servicex._version import __version__
34+
from servicex.app.datasets import datasets_app
3435
from servicex.app.transforms import transforms_app
3536
from servicex.app.cache import cache_app
3637
from servicex.app.codegen import codegen_app
@@ -40,6 +41,7 @@
4041
app.add_typer(transforms_app)
4142
app.add_typer(cache_app)
4243
app.add_typer(codegen_app)
44+
app.add_typer(datasets_app)
4345

4446

4547
def show_version(show: bool):

servicex/expandable_progress.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ def advance(self, task_id, task_type):
193193
elif self.display_progress and not self.overall_progress:
194194
self.progress.advance(task_id=task_id)
195195

196+
def refresh(self):
197+
if self.progress is not None:
198+
self.progress.refresh()
199+
196200

197201
class TranformStatusProgress(Progress):
198202
def get_renderables(self):

servicex/models.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,18 @@ class TransformedResults(BaseModel):
170170
files: int
171171
result_format: ResultFormat
172172
log_url: Optional[str] = None
173+
174+
175+
class CachedDataset(BaseModel):
176+
"""
177+
Model for a cached dataset held by ServiceX server
178+
"""
179+
id: int
180+
name: str
181+
did_finder: str
182+
n_files: int
183+
size: int
184+
events: int
185+
last_used: datetime
186+
last_updated: datetime
187+
lookup_status: str

servicex/query_core.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def __init__(
7979
result_format: ResultFormat = ResultFormat.parquet,
8080
ignore_cache: bool = False,
8181
query_string_generator: Optional[QueryStringGenerator] = None,
82+
fail_if_incomplete: bool = True,
8283
):
8384
r"""
8485
This is the main class for constructing transform requests and receiving the
@@ -96,6 +97,7 @@ def __init__(
9697
for new files?
9798
:param result_format:
9899
:param ignore_cache: If true, ignore the cache and always submit a new transform
100+
:param fail_if_incomplete: If true, raise an exception if we don't have 100% completion
99101
"""
100102
self.servicex = sx_adapter
101103
self.configuration = config
@@ -116,6 +118,7 @@ def __init__(
116118

117119
self.request_id = None
118120
self.ignore_cache = ignore_cache
121+
self.fail_if_incomplete = fail_if_incomplete
119122
self.query_string_generator = query_string_generator
120123

121124
# Number of seconds in between ServiceX status polls
@@ -191,6 +194,7 @@ def transform_complete(task: Task):
191194
:param task:
192195
:return:
193196
"""
197+
expandable_progress.refresh()
194198
if task.exception():
195199
logger.error(
196200
f"ServiceX Exception for request ID {self.request_id} ({self.title})\"",
@@ -210,11 +214,16 @@ def transform_complete(task: Task):
210214
self.cache.delete_record_by_request_id(self.request_id)
211215
titlestr = (f'"{self.current_status.title}" '
212216
if self.current_status.title is not None else '')
213-
logger.error(
217+
errorstr = (
214218
f"Transform {titlestr}completed with failures: "
215219
f"{self.current_status.files_failed}/"
216220
f"{self.current_status.files} files failed. Will not cache."
217221
)
222+
failedfiles = (self.servicex.url + '/transformation-request/'
223+
+ f'/{self.request_id}/results?status=failure')
224+
errorstr2 = ("A list of failed files is at [bold red on white]"
225+
f"[link={failedfiles}]this link[/link][/bold red on white]")
226+
logger.error(errorstr2)
218227
logger.error(
219228
f"Transform Request id: {self.current_status.request_id}"
220229
)
@@ -225,6 +234,8 @@ def transform_complete(task: Task):
225234
LogLevel.error,
226235
TimeFrame.month)
227236
logger.error(f"More information of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]") # NOQA: E501
237+
if self.fail_if_incomplete:
238+
raise ServiceXException(errorstr)
228239
else:
229240
logger.info("Transforms completed successfully")
230241
else: # pragma: no cover
@@ -234,7 +245,8 @@ def transform_complete(task: Task):
234245
sx_request_hash = sx_request.compute_hash()
235246

236247
# Invalidate the cache if the hash already present but if the user ignores cache
237-
if self.ignore_cache and self.cache.contains_hash(sx_request_hash):
248+
if self.ignore_cache and (self.cache.contains_hash(sx_request_hash)
249+
or self.cache.is_transform_request_submitted(sx_request_hash)):
238250
self.cache.delete_record_by_hash(sx_request_hash)
239251

240252
# Let's see if this is in the cache already, but respect the user's wishes

servicex/servicex_adapter.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from google.auth import jwt
3636
from tenacity import AsyncRetrying, stop_after_attempt, wait_fixed, retry_if_not_exception_type
3737

38-
from servicex.models import TransformRequest, TransformStatus
38+
from servicex.models import TransformRequest, TransformStatus, CachedDataset
3939

4040

4141
class AuthorizationError(BaseException):
@@ -123,6 +123,22 @@ def get_code_generators(self):
123123
f"Not authorized to access serviceX at {self.url}")
124124
return r.json()
125125

126+
async def get_datasets(self, did_finder=None) -> List[CachedDataset]:
127+
headers = await self._get_authorization()
128+
129+
with httpx.Client() as client:
130+
params = {"did-finder": did_finder} if did_finder else {}
131+
r = client.get(headers=headers,
132+
url=f"{self.url}/servicex/datasets",
133+
params=params)
134+
135+
if r.status_code == 403:
136+
raise AuthorizationError(
137+
f"Not authorized to access serviceX at {self.url}")
138+
139+
datasets = [CachedDataset(**d) for d in r.json()['datasets']]
140+
return datasets
141+
126142
async def submit_transform(self, transform_request: TransformRequest) -> str:
127143
headers = await self._get_authorization()
128144
retry_options = ExponentialRetry(attempts=3, start_timeout=30)

servicex/servicex_client.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def _load_ServiceXSpec(
137137
return config
138138

139139

140-
def _build_datasets(config, config_path, servicex_name):
140+
def _build_datasets(config, config_path, servicex_name, fail_if_incomplete):
141141
def get_codegen(_sample: Sample, _general: General):
142142
if _sample.Codegen is not None:
143143
return _sample.Codegen
@@ -158,6 +158,7 @@ def get_codegen(_sample: Sample, _general: General):
158158
result_format=config.General.OutputFormat.to_ResultFormat(),
159159
ignore_cache=sample.IgnoreLocalCache,
160160
query=sample.Query,
161+
fail_if_incomplete=fail_if_incomplete
161162
)
162163
logger.debug(f"Query string: {query.generate_selection_string()}")
163164
query.ignore_cache = sample.IgnoreLocalCache
@@ -197,11 +198,12 @@ def deliver(
197198
config: Union[ServiceXSpec, Mapping[str, Any], str, Path],
198199
config_path: Optional[str] = None,
199200
servicex_name: Optional[str] = None,
200-
return_exceptions: bool = True
201+
return_exceptions: bool = True,
202+
fail_if_incomplete: bool = True
201203
):
202204
config = _load_ServiceXSpec(config)
203205

204-
datasets = _build_datasets(config, config_path, servicex_name)
206+
datasets = _build_datasets(config, config_path, servicex_name, fail_if_incomplete)
205207

206208
group = DatasetGroup(datasets)
207209

@@ -278,6 +280,13 @@ async def get_transform_status_async(self, transform_id) -> TransformStatus:
278280

279281
get_transform_status = make_sync(get_transform_status_async)
280282

283+
def get_datasets(self, did_finder=None):
284+
r"""
285+
Retrieve all datasets you have run on the server
286+
:return: List of Query objects
287+
"""
288+
return self.servicex.get_datasets(did_finder)
289+
281290
def get_code_generators(self, backend=None):
282291
r"""
283292
Retrieve the code generators deployed with the serviceX instance
@@ -302,6 +311,7 @@ def generic_query(
302311
title: str = "ServiceX Client",
303312
result_format: ResultFormat = ResultFormat.parquet,
304313
ignore_cache: bool = False,
314+
fail_if_incomplete: bool = True,
305315
) -> Query:
306316
r"""
307317
Generate a Query object for a generic codegen specification
@@ -345,6 +355,7 @@ def generic_query(
345355
query_cache=self.query_cache,
346356
result_format=result_format,
347357
ignore_cache=ignore_cache,
348-
query_string_generator=query
358+
query_string_generator=query,
359+
fail_if_incomplete=fail_if_incomplete
349360
)
350361
return qobj

tests/test_expandable_progress.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ def test_overall_progress_mock(mock_progress):
6161
assert mock_progress.return_value.stop.call_count == 1
6262

6363

64+
@patch("servicex.expandable_progress.TranformStatusProgress",
65+
return_value=MagicMock(TranformStatusProgress))
66+
def test_refresh_mock(mock_progress):
67+
with ExpandableProgress(overall_progress=True) as progress:
68+
progress.refresh()
69+
mock_progress.return_value.refresh.assert_called_once()
70+
71+
6472
def test_provided_progress(mocker):
6573
class MockedProgress(TranformStatusProgress):
6674
def __init__(self):

0 commit comments

Comments
 (0)