Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions servicex/servicex_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from pathlib import Path

from servicex.configuration import Configuration
from servicex.models import ResultFormat, TransformStatus, TransformedResults
from servicex.models import (
ResultFormat,
TransformStatus,
TransformedResults,
CachedDataset,
)
from servicex.query_cache import QueryCache
from servicex.servicex_adapter import ServiceXAdapter
from servicex.query_core import (
Expand All @@ -44,7 +49,7 @@

from make_it_sync import make_sync
from servicex.databinder_models import ServiceXSpec, General, Sample
from collections.abc import Sequence
from collections.abc import Sequence, Coroutine
from enum import Enum
import traceback

Expand Down Expand Up @@ -115,6 +120,15 @@ def __repr__(self):
return f"Invalid GuardList: {repr(data._exc)}"


def _async_execute_and_wait(coro: Coroutine) -> Any:
import asyncio

try:
return asyncio.create_task(coro).result()
except RuntimeError:
return asyncio.run(coro)


def _load_ServiceXSpec(
config: Union[ServiceXSpec, Mapping[str, Any], str, Path],
) -> ServiceXSpec:
Expand Down Expand Up @@ -353,40 +367,40 @@ async def get_transform_status_async(self, transform_id) -> TransformStatus:

get_transform_status = make_sync(get_transform_status_async)

def get_datasets(self, did_finder=None, show_deleted=False):
def get_datasets(self, did_finder=None, show_deleted=False) -> List[CachedDataset]:
r"""
Retrieve all datasets you have run on the server
:return: List of Query objects
"""
return self.servicex.get_datasets(did_finder, show_deleted)
return _async_execute_and_wait(
self.servicex.get_datasets(did_finder, show_deleted)
)

def get_dataset(self, dataset_id):
def get_dataset(self, dataset_id) -> CachedDataset:
r"""
Retrieve a dataset by its ID
:return: A Query object
"""
return self.servicex.get_dataset(dataset_id)
return _async_execute_and_wait(self.servicex.get_dataset(dataset_id))

def delete_dataset(self, dataset_id):
def delete_dataset(self, dataset_id) -> bool:
r"""
Delete a dataset by its ID
:return: A Query object
:return: boolean showing whether the dataset has been deleted
"""
return self.servicex.delete_dataset(dataset_id)
return _async_execute_and_wait(self.servicex.delete_dataset(dataset_id))

def delete_transform(self, transform_id):
def delete_transform(self, transform_id) -> None:
r"""
Delete a Transform by its request ID
:return: A Query object
"""
return self.servicex.delete_transform(transform_id)
return _async_execute_and_wait(self.servicex.delete_transform(transform_id))

def cancel_transform(self, transform_id):
def cancel_transform(self, transform_id) -> None:
r"""
Cancel a Transform by its request ID
:return: A Query object
"""
return self.servicex.cancel_transform(transform_id)
return _async_execute_and_wait(self.servicex.cancel_transform(transform_id))

def get_code_generators(self, backend=None):
r"""
Expand Down