Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
"qastle>=0.17",
"func_adl>=3.2.6",
"requests>=2.31",
"pydantic>=1.10,<2.0", # FIXME: Adopt pydantic 2.0 API
"pydantic>=2.0",
"httpx>=0.24",
"miniopy-async>=1.15",
"tinydb>=4.7",
Expand Down
4 changes: 2 additions & 2 deletions servicex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from servicex import models
from servicex import servicex_client
from servicex import dataset_identifier
from servicex.databinder_models import Sample, General, Definition, ServiceXSpec
from servicex.databinder_models import Sample, General, DefinitionDict, ServiceXSpec
from servicex.func_adl.func_adl_dataset import FuncADLQuery
from servicex.servicex_client import ServiceXClient, deliver
from .query import Query
Expand All @@ -53,7 +53,7 @@
"FuncADLQuery",
"Sample",
"General",
"Definition",
"DefinitionDict",
"ServiceXSpec",
"deliver"
]
26 changes: 15 additions & 11 deletions servicex/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,39 @@
from pathlib import Path, PurePath
from typing import List, Optional, Dict

from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, AliasChoices, model_validator

import yaml


class Endpoint(BaseModel):
endpoint: str
name: str
token: Optional[str]
token: Optional[str] = ""


class Configuration(BaseModel):
api_endpoints: List[Endpoint]
default_endpoint: Optional[str] = Field(alias="default-endpoint", default=None)
cache_path: Optional[str] = Field(alias="cache-path", default=None)
cache_path: Optional[str] = Field(
validation_alias=AliasChoices("cache-path", "cache_path"), default=None
)

shortened_downloaded_filename: Optional[bool] = False

@validator("cache_path", always=True)
def expand_cache_path(cls, v):
@model_validator(mode="after")
def expand_cache_path(self):
"""
Expand the cache path to a full path, and create it if it doesn't exist.
Expand ${USER} to be the user name on the system. Works for windows, too.
:param v:
:return:
"""
# create a folder inside the tmp directory if not specified in cache_path
if not v:
v = "/tmp/servicex_${USER}"
if not self.cache_path:
self.cache_path = "/tmp/servicex_${USER}"

s_path = os.path.expanduser(v)
s_path = os.path.expanduser(self.cache_path)

# If they have tried to use the USER or UserName as an expansion, and it has failed, then
# translate it to maintain harmony across platforms.
Expand All @@ -75,10 +78,11 @@ def expand_cache_path(cls, v):
p = Path(p_p)
p.mkdir(exist_ok=True, parents=True)

return p.as_posix()
self.cache_path = p.as_posix()
return self

class Config:
allow_population_by_field_name = True
populate_by_name = True

def endpoint_dict(self) -> Dict[str, Endpoint]:
return {endpoint.name: endpoint for endpoint in self.api_endpoints}
Expand All @@ -98,7 +102,7 @@ def read(cls, config_path: Optional[str] = None):
yaml_config = cls._add_from_path(walk_up_tree=True)

if yaml_config:
return Configuration(**yaml_config)
return Configuration.model_validate(yaml_config)
else:
path_extra = f"in {config_path}" if config_path else ""
raise NameError(
Expand Down
81 changes: 48 additions & 33 deletions servicex/databinder_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from enum import Enum
from typing import Union, Optional, Callable, List
from pydantic import BaseModel, Field, root_validator, constr, validator
from pydantic import (
BaseModel,
Field,
model_validator,
)

from servicex.dataset_identifier import RucioDatasetIdentifier, FileListDataset
from servicex.func_adl import func_adl_dataset


class Sample(BaseModel):
Name: str
Codegen: Optional[str]
RucioDID: Optional[str]
XRootDFiles: Optional[Union[str, List[str]]]
Codegen: Optional[str] = None
RucioDID: Optional[str] = None
XRootDFiles: Optional[Union[str, List[str]]] = None
NFiles: Optional[int] = Field(default=None)
Function: Optional[Union[str, Callable]] = Field(default=None)
Query: Optional[Union[str, func_adl_dataset.Query]] = Field(default=None)
Expand All @@ -54,29 +58,31 @@ def dataset_identifier(self):
elif self.XRootDFiles:
return FileListDataset(self.XRootDFiles)

@root_validator
@model_validator(mode="before")
@classmethod
def validate_did_xor_file(cls, values):
"""
Ensure that only one of RootFile or RucioDID is specified.
:param values:
:return:
"""
if values['XRootDFiles'] and values['RucioDID']:
if "XRootDFiles" in values and "RucioDID" in values:
raise ValueError("Only specify one of XRootDFiles or RucioDID, not both.")
if not values['XRootDFiles'] and not values['RucioDID']:
if "XRootDFiles" not in values and "RucioDID" not in values:
raise ValueError("Must specify one of XRootDFiles or RucioDID.")
return values

@root_validator
@model_validator(mode="before")
@classmethod
def validate_function_xor_query(cls, values):
"""
Ensure that only one of Function or Query is specified.
:param values:
:return:
"""
if values['Function'] and values['Query']:
if "Function" in values and "Query" in values:
raise ValueError("Only specify one of Function or Query, not both.")
if not values['Function'] and not values['Query']:
if "Function" not in values and "Query" not in values:
raise ValueError("Must specify one of Function or Query.")
return values

Expand All @@ -92,17 +98,21 @@ class DeliveryEnum(str, Enum):

ServiceX: str = Field(..., alias="ServiceX")
Codegen: str
OutputFormat: Union[OutputFormatEnum,
constr(regex='^(parquet|root-file)$')] = Field(default=OutputFormatEnum.root) # NOQA F722
OutputFormat: OutputFormatEnum = (
Field(default=OutputFormatEnum.root, pattern="^(parquet|root-file)$")
) # NOQA F722

Delivery: Union[DeliveryEnum, constr(regex='^(LocalCache|SignedURLs)$')] = Field(default=DeliveryEnum.LocalCache) # NOQA F722
Delivery: DeliveryEnum = Field(
default=DeliveryEnum.LocalCache, pattern="^(LocalCache|SignedURLs)$"
) # NOQA F722


class Definition(BaseModel):
class DefinitionDict(BaseModel):
class Config:
extra = "allow" # Allow additional fields not defined in the model

@root_validator
@model_validator(mode="before")
@classmethod
def check_def_name(cls, values):
"""
Ensure that the definition name is DEF_XXX format
Expand All @@ -111,31 +121,36 @@ def check_def_name(cls, values):
"""
for field in values:
if not field.startswith("DEF_"):
raise ValueError(f"Definition key {field} does not meet the convention of DEF_XXX format") # NOQA E501
raise ValueError(
f"Definition key {field} does not meet the convention of DEF_XXX format"
) # NOQA E501
return values


class ServiceXSpec(BaseModel):
General: General
Sample: List[Sample]
Definition: Optional[Definition]

@validator('Sample')
def check_tree_property(cls, v, values):
if 'General' in values and values['General'].Codegen != 'uproot':
for sample in v:
if 'Tree' in sample:
raise ValueError('"Tree" property is not allowed when codegen is not "uproot"')
return v

@root_validator(pre=False, skip_on_failure=True)
def replace_definition(cls, values):
Definition: Optional[DefinitionDict] = None

@model_validator(mode="after")
def check_tree_property(self):
if self.General and self.General.Codegen != "uproot":
for sample in self.Sample:
if "Tree" in sample:
raise ValueError(
'"Tree" property is not allowed when codegen is not "uproot"'
)
return self

@model_validator(mode="after")
def replace_definition(self):
"""
Replace the definition name with the actual definition value looking
through the Samples and the General sections
:param values:
:return:
"""

def replace_value_from_def(value, defs):
"""
Replace the value with the actual definition value
Expand All @@ -154,14 +169,14 @@ def replace_value_from_def(value, defs):
raise ValueError(f"Definition {value} not found")
return value

if 'Definition' in values and values['Definition'] is not None:
defs = values['Definition'].dict()
if self.Definition and self.Definition:
defs = self.Definition.dict()
else:
defs = {}

for sample_field in values['Sample']:
for sample_field in self.Sample:
replace_value_from_def(sample_field.__dict__, defs)

replace_value_from_def(values['General'].__dict__, defs)
replace_value_from_def(self.General.__dict__, defs)

return values
return self
56 changes: 33 additions & 23 deletions servicex/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
from datetime import datetime
from enum import Enum

from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, field_validator
from typing import List, Optional


class ResultDestination(str, Enum):
r"""
Direct the output to object store or posix volume
"""

object_store = "object-store"
volume = "volume"

Expand All @@ -45,6 +46,7 @@ class ResultFormat(str, Enum):
r"""
Specify the file format for the generated output
"""

parquet = "parquet"
root = "root-file"

Expand All @@ -53,6 +55,7 @@ class Status(str, Enum):
r"""
Status of a submitted transform
"""

complete = ("Complete",)
fatal = ("Fatal",)
canceled = ("Canceled",)
Expand All @@ -65,18 +68,21 @@ class TransformRequest(BaseModel):
r"""
Transform request sent to ServiceX
"""

title: Optional[str] = None
did: Optional[str] = None
file_list: Optional[List[str]] = Field(default=None, alias="file-list")
selection: str
image: Optional[str] = None
codegen: str
tree_name: Optional[str] = Field(default=None, alias="tree-name")
result_destination: ResultDestination = Field(alias="result-destination")
result_format: ResultFormat = Field(alias="result-format")
result_destination: ResultDestination = Field(
serialization_alias="result-destination"
)
result_format: ResultFormat = Field(serialization_alias="result-format")

class Config:
allow_population_by_field_name = True
populate_by_name = True

def compute_hash(self):
r"""
Expand Down Expand Up @@ -105,30 +111,32 @@ class TransformStatus(BaseModel):
r"""
Status object returned by servicex
"""

request_id: str
did: str
title: Optional[str]
title: Optional[str] = None
selection: str
tree_name: Optional[str] = Field(alias="tree-name")
tree_name: Optional[str] = Field(validation_alias="tree-name")
image: str
result_destination: ResultDestination = Field(alias="result-destination")
result_format: ResultFormat = Field(alias="result-format")
generated_code_cm: str = Field(alias="generated-code-cm")
result_destination: ResultDestination = Field(validation_alias="result-destination")
result_format: ResultFormat = Field(validation_alias="result-format")
generated_code_cm: str = Field(validation_alias="generated-code-cm")
status: Status
app_version: str = Field(alias="app-version")
app_version: str = Field(validation_alias="app-version")
files: int
files_completed: int = Field(alias="files-completed")
files_failed: int = Field(alias="files-failed")
files_remaining: Optional[int] = Field(alias="files-remaining")
submit_time: datetime = Field(alias="submit-time")
finish_time: Optional[datetime] = Field(alias="finish-time")
minio_endpoint: Optional[str] = Field(alias="minio-endpoint")
minio_secured: Optional[bool] = Field(alias="minio-secured")
minio_access_key: Optional[str] = Field(alias="minio-access-key")
minio_secret_key: Optional[str] = Field(alias="minio-secret-key")
log_url: Optional[str] = Field(alias="log-url")

@validator("finish_time", pre=True)
files_completed: int = Field(validation_alias="files-completed")
files_failed: int = Field(validation_alias="files-failed")
files_remaining: Optional[int] = Field(validation_alias="files-remaining", default=0)
submit_time: datetime = Field(validation_alias="submit-time", default=None)
finish_time: Optional[datetime] = Field(validation_alias="finish-time", default=None)
minio_endpoint: Optional[str] = Field(validation_alias="minio-endpoint", default=None)
minio_secured: Optional[bool] = Field(validation_alias="minio-secured", default=None)
minio_access_key: Optional[str] = Field(validation_alias="minio-access-key", default=None)
minio_secret_key: Optional[str] = Field(validation_alias="minio-secret-key", default=None)
log_url: Optional[str] = Field(validation_alias="log-url", default=None)

@field_validator("finish_time", mode="before")
@classmethod
def parse_finish_time(cls, v):
if isinstance(v, str) and v == "None":
return None
Expand All @@ -139,6 +147,7 @@ class ResultFile(BaseModel):
r"""
Record reporting the properties of a transformed file result
"""

filename: str
size: int
extension: str
Expand All @@ -149,6 +158,7 @@ class TransformedResults(BaseModel):
Returned for a submission. Gives you everything you need to know about a completed
transform.
"""

hash: str
title: str
codegen: str
Expand All @@ -159,4 +169,4 @@ class TransformedResults(BaseModel):
signed_url_list: List[str]
files: int
result_format: ResultFormat
log_url: Optional[str]
log_url: Optional[str] = None
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def completed_status() -> TransformStatus:
"files-failed": 0,
"files-remaining": 1,
"submit-time": "2023-05-25T20:05:05.564137Z",
"finish-time": "None",
"finish-time": None,
"minio-endpoint": "minio.org:9000",
"minio-secured": False,
"minio-access-key": "miniouser",
Expand Down
Loading