Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/datasets.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Specifying input datasets
========

When making a ServiceX request, you need to specify where the source data are to be found. These is done by creating instances of ``servicex.dataset`` classes (when writing Python code) or by using appropriate YAML tags (when writing a YAML configuration). The possibilities are given below:

Rucio
^^^^
This will look up a dataset using a query to the Rucio data management system. The request is assumed to be for a Rucio dataset or container.
* Python: ``{ "Dataset": servicex.dataset.Rucio("my.rucio.dataset.name") }``
* YAML: ``Dataset: !Rucio my.rucio.dataset.name``

CERN Open Data Portal
^^^^
This looks up files in datasets using their integer ID in the `CERN Open Data Portal <https://opendata.cern.ch/>`_. (If you access the web page for a dataset, this is the number following ``opendata.cern.ch/record/`` in the URL.)
* Python: ``{ "Dataset": servicex.dataset.CERNOpenData(179) }``
* YAML: ``Dataset: !CERNOpenData 179``

A list of network-accessible files
^^^^
If you have URLs for files that ServiceX can access via either the HTTP or XRootD protocols, then these URLs can be given directly to ServiceX. Note that the ServiceX instance must have permissions to read these files; in particular if generic members of your experiment can't access the files, ServiceX will probably not be able to either.
* Python: ``{ "Dataset": servicex.dataset.FileList(["http://server/file1.root", "root://server/file2.root"]) }``
* YAML: ``Dataset: !FileList ["http://server/file1.root", "root://server/file2.root"]``

XRootD pattern (useful for EOS)
^^^^
Files can also be located using wildcard patterns with XRootD. So, for example, if you want to include all files in the directory ``/eos/opendata/mystuff`` in the CERN EOS system, given that the directory is made available to the world as ``root://eospublic.cern.ch//eos/opendata/mystuff``, you can ask for ``root://eospublic.cern.ch//eos/opendata/mystuff/*``.

*Note: available from ServiceX client version 3.0.1.*
* Python: ``{ "Dataset": servicex.dataset.XRootD("root://eospublic.cern.ch//eos/opendata/mystuff/*") }``
* YAML: ``Dataset: !XRootD root://eospublic.cern.ch//eos/opendata/mystuff/*``
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ re-running queries that have already been executed.
connect_servicex
query_types
transform_request
datasets
examples
command_line
contribute
Expand Down
4 changes: 2 additions & 2 deletions docs/transform_request.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ data, select ``LocalCache`` for ``Delivery``. If you are located at an analysis
The Definitions Sections
^^^^^^^^^^^^^^^^^^^^^^^^

The Definitions section is a dictionary of values that can be substituted into fields in the Sample
sections. This is useful for defining common values that are used in multiple samples. This is an advanced concept.
The Definitions section (only available when setting up the request using YAML files) is a list of values that can be substituted into fields in the Sample
sections, defined using the YAML anchor/alias syntax. This is useful for defining common values that are used in multiple samples. This is an advanced concept.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ UprootRaw = "servicex.uproot_raw.uproot_raw:UprootRawQuery"
Rucio = "servicex.dataset_identifier:RucioDatasetIdentifier"
FileList = "servicex.dataset_identifier:FileListDataset"
CERNOpenData = "servicex.dataset_identifier:CERNOpenDataDatasetIdentifier"
XRootD = "servicex.dataset_identifier:XRootDDatasetIdentifier"

[tool.hatch.build.targets.sdist]
# hatchling always includes:
Expand Down
5 changes: 3 additions & 2 deletions servicex/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from ..dataset_identifier import (RucioDatasetIdentifier as Rucio, # noqa: F401
FileListDataset as FileList,
CERNOpenDataDatasetIdentifier as CERNOpenData,
DataSetIdentifier as GenericDataSet)
DataSetIdentifier as GenericDataSet,
XRootDDatasetIdentifier as XRootD)


__any__ = ['Rucio', 'FileList', 'CERNOpenData', 'GenericDataSet']
__any__ = ['Rucio', 'FileList', 'CERNOpenData', 'GenericDataSet', 'XRootD']
20 changes: 20 additions & 0 deletions servicex/dataset_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,23 @@ def __init__(self, dataset: int, num_files: Optional[int] = None):
@classmethod
def from_yaml(cls, _, node):
return cls(int(node.value))


class XRootDDatasetIdentifier(DataSetIdentifier):
def __init__(self, pattern: str, num_files: Optional[int] = None):
r"""
CERN Open Data Dataset - this will be looked up using the CERN Open Data DID finder.

:param dataset: The dataset ID - this is an integer.
:param num_files: Maximum number of files to return. This is useful during development
to perform quick runs. ServiceX is careful to make sure it always
returns the same subset of files.

"""
super().__init__("xrootd", pattern, num_files=num_files)

yaml_tag = '!XRootD'

@classmethod
def from_yaml(cls, _, node):
return cls(node.value)
17 changes: 17 additions & 0 deletions tests/test_databinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ def test_cernopendata():
assert spec.Sample[0].dataset_identifier.did == "cernopendata://1507"


def test_xrootd():
spec = ServiceXSpec.model_validate({
"Sample": [
{
"Name": "sampleA",
"Dataset": dataset.XRootD('root://blablabla/*/?.root'),
"Function": "a"
}
]
})
assert spec.Sample[0].dataset_identifier.did == "xrootd://root://blablabla/*/?.root"


def test_invalid_dataset_identifier():
with pytest.raises(ValidationError):
ServiceXSpec.model_validate(
Expand Down Expand Up @@ -328,6 +341,8 @@ def run_query(input_filenames=None):
- Name: ttH6
Dataset: !CERNOpenData 1507
Query: !UprootRaw '[{"treename": "nominal"}]'
- Name: ttH7
Dataset: !XRootD root://eosatlas.cern.ch//eos/atlas/path/*/file.root
""")
f.flush()
result = _load_ServiceXSpec(path)
Expand All @@ -342,6 +357,8 @@ def run_query(input_filenames=None):
== ["/path/to/file1.root", "/path/to/file2.root"])
assert isinstance(result.Sample[5].dataset_identifier, CERNOpenData)
assert result.Sample[5].dataset_identifier.did == 'cernopendata://1507'
assert (result.Sample[6].dataset_identifier.did
== 'xrootd://root://eosatlas.cern.ch//eos/atlas/path/*/file.root')

# Path from string
result2 = _load_ServiceXSpec(str(path))
Expand Down
38 changes: 21 additions & 17 deletions tests/test_servicex_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import tempfile
from typing import List
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, patch
from pathlib import PurePath

import pytest
Expand Down Expand Up @@ -422,7 +422,7 @@ async def test_submit_fatal(mocker):


@pytest.mark.asyncio
async def test_submit_generic(mocker):
async def test_submit_generic(mocker, codegen_list):
""" Uses Uproot-Raw classes which go through the generic query mechanism """
import json
sx = AsyncMock()
Expand All @@ -442,14 +442,16 @@ async def test_submit_generic(mocker):
mock_cache = mocker.MagicMock(QueryCache)
mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio)
did = FileListDataset("/foo/bar/baz.root")
client = ServiceXClient(backend='servicex-uc-af', config_path='tests/example_config.yaml')
client.servicex = sx
client.query_cache = mock_cache
with patch('servicex.servicex_adapter.ServiceXAdapter.get_code_generators',
return_value=codegen_list):
client = ServiceXClient(backend='servicex-uc-af', config_path='tests/example_config.yaml')
client.servicex = sx
client.query_cache = mock_cache

datasource = client.generic_query(
dataset_identifier=did,
query=UprootRawQuery({'treename': 'CollectionTree'})
)
datasource = client.generic_query(
dataset_identifier=did,
query=UprootRawQuery({'treename': 'CollectionTree'})
)
with ExpandableProgress(display_progress=False) as progress:
datasource.result_format = ResultFormat.parquet
_ = await datasource.submit_and_download(signed_urls_only=False,
Expand All @@ -468,7 +470,7 @@ async def test_submit_generic(mocker):


@pytest.mark.asyncio
async def test_submit_cancelled(mocker):
async def test_submit_cancelled(mocker, codegen_list):
""" Uses Uproot-Raw classes which go through the query cancelled mechanism """
import json
sx = AsyncMock()
Expand All @@ -486,14 +488,16 @@ async def test_submit_cancelled(mocker):
mock_cache = mocker.MagicMock(QueryCache)
mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio)
did = FileListDataset("/foo/bar/baz.root")
client = ServiceXClient(backend='servicex-uc-af', config_path='tests/example_config.yaml')
client.servicex = sx
client.query_cache = mock_cache
with patch('servicex.servicex_adapter.ServiceXAdapter.get_code_generators',
return_value=codegen_list):
client = ServiceXClient(backend='servicex-uc-af', config_path='tests/example_config.yaml')
client.servicex = sx
client.query_cache = mock_cache

datasource = client.generic_query(
dataset_identifier=did,
query=UprootRawQuery({'treename': 'CollectionTree'})
)
datasource = client.generic_query(
dataset_identifier=did,
query=UprootRawQuery({'treename': 'CollectionTree'})
)
with ExpandableProgress(display_progress=False) as progress:
datasource.result_format = ResultFormat.parquet
_ = await datasource.submit_and_download(signed_urls_only=False,
Expand Down
Loading