Skip to content

Commit b7f1213

Browse files
gordonwattsBenGalewsky
authored andcommitted
Move the fancy rich prints over to being logging calls.
1 parent 27c94a8 commit b7f1213

File tree

2 files changed

+117
-68
lines changed

2 files changed

+117
-68
lines changed

servicex/query.py

Lines changed: 97 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import asyncio
3232
from abc import ABC
3333
from asyncio import Task, CancelledError
34+
import logging
3435
from typing import List, Optional, Union
3536
from servicex.expandable_progress import ExpandableProgress
3637

@@ -43,9 +44,7 @@
4344
from servicex.types import DID
4445

4546
import rich
46-
from rich.progress import (
47-
Progress,
48-
TaskID)
47+
from rich.progress import Progress, TaskID
4948

5049
from servicex.configuration import Configuration
5150
from servicex.minio_adapter import MinioAdapter
@@ -157,9 +156,10 @@ def set_result_format(self, result_format: ResultFormat):
157156
return self
158157

159158
async def submit_and_download(
160-
self, signed_urls_only: bool,
161-
expandable_progress: ExpandableProgress,
162-
dataset_group: Optional[bool] = False,
159+
self,
160+
signed_urls_only: bool,
161+
expandable_progress: ExpandableProgress,
162+
dataset_group: Optional[bool] = False,
163163
) -> Optional[TransformedResults]:
164164
"""
165165
Submit the transform request to ServiceX. Poll the transform status to see when
@@ -186,27 +186,31 @@ def transform_complete(task: Task):
186186
:return:
187187
"""
188188
if task.exception():
189-
rich.print("ServiceX Exception", task.exception())
189+
logging.getLogger(__name__).error(
190+
"ServiceX Exception", exc_info=task.exception()
191+
)
190192
if download_files_task:
191193
download_files_task.cancel("Transform failed")
192194
raise task.exception()
193195

194196
if self.current_status.files_failed:
195-
rich.print(
196-
f"[bold red]Transforms completed with failures[/bold red] "
197+
logging.getLogger(__name__).warning(
198+
f"Transforms completed with failures "
197199
f"{self.current_status.files_failed} files failed out of "
198200
f"{self.current_status.files}"
199201
)
200202
else:
201-
rich.print("Transforms completed successfully")
203+
logging.getLogger(__name__).info("Transforms completed successfully")
202204

203205
sx_request = self.transform_request
204206

205207
# Let's see if this is in the cache already, but respect the user's wishes
206208
# to ignore the cache
207-
cached_record = self.cache.get_transform_by_hash(sx_request.compute_hash()) \
208-
if not self.ignore_cache \
209+
cached_record = (
210+
self.cache.get_transform_by_hash(sx_request.compute_hash())
211+
if not self.ignore_cache
209212
else None
213+
)
210214

211215
# And that we grabbed the resulting files in the way that the user requested
212216
# (Downloaded, or obtained pre-signed URLs)
@@ -217,17 +221,21 @@ def transform_complete(task: Task):
217221
or not signed_urls_only
218222
and cached_record.file_list
219223
):
220-
rich.print("Returning results from cache")
224+
logging.getLogger(__name__).info("Returning results from cache")
221225
return cached_record
222226

223227
# If we get here with a cached record, then we know that the transform
224228
# has been run, but we just didn't get the files from object store in the way
225229
# requested by user
226230
transform_bar_title = f"{sx_request.title}: Transform"
227231
if not cached_record:
228-
transform_progress = expandable_progress.add_task(
229-
transform_bar_title, start=False, total=None
230-
) if expandable_progress else None
232+
transform_progress = (
233+
expandable_progress.add_task(
234+
transform_bar_title, start=False, total=None
235+
)
236+
if expandable_progress
237+
else None
238+
)
231239
else:
232240
self.request_id = cached_record.request_id
233241
transform_progress = None
@@ -236,19 +244,28 @@ def transform_complete(task: Task):
236244
minio_progress_bar_title = (
237245
"Download" if not signed_urls_only else "Signing URLS"
238246
)
239-
minio_progress_bar_title = minio_progress_bar_title.rjust(len(transform_bar_title))
247+
minio_progress_bar_title = minio_progress_bar_title.rjust(
248+
len(transform_bar_title)
249+
)
240250

241-
download_progress = expandable_progress.add_task(
242-
minio_progress_bar_title, start=False, total=None
243-
) if expandable_progress else None
251+
download_progress = (
252+
expandable_progress.add_task(
253+
minio_progress_bar_title, start=False, total=None
254+
)
255+
if expandable_progress
256+
else None
257+
)
244258

245259
if not cached_record:
246260
self.request_id = await self.servicex.submit_transform(sx_request)
247261

248262
monitor_task = loop.create_task(
249263
self.transform_status_listener(
250-
expandable_progress, transform_progress, transform_bar_title,
251-
download_progress, minio_progress_bar_title
264+
expandable_progress,
265+
transform_progress,
266+
transform_bar_title,
267+
download_progress,
268+
minio_progress_bar_title,
252269
)
253270
)
254271
monitor_task.add_done_callback(transform_complete)
@@ -289,11 +306,17 @@ def transform_complete(task: Task):
289306

290307
return transform_report
291308
except CancelledError:
292-
rich.print("Aborted file downloads due to transform failure")
309+
logging.getLogger(__name__).warning(
310+
"Aborted file downloads due to transform failure"
311+
)
293312

294313
async def transform_status_listener(
295-
self, progress: ExpandableProgress, progress_task: TaskID,
296-
progress_bar_title: str, download_task: TaskID, download_bar_title: str
314+
self,
315+
progress: ExpandableProgress,
316+
progress_task: TaskID,
317+
progress_bar_title: str,
318+
download_task: TaskID,
319+
download_bar_title: str,
297320
):
298321
"""
299322
Poll ServiceX for the status of a transform. Update progress bars and keep track
@@ -314,16 +337,21 @@ async def transform_status_listener(
314337
if not final_count and self.current_status.files:
315338
final_count = self.current_status.files
316339
if progress:
317-
progress.update(progress_task, progress_bar_title, total=final_count)
340+
progress.update(
341+
progress_task, progress_bar_title, total=final_count
342+
)
318343
progress.start_task(task_id=progress_task, task_type="Transform")
319344

320-
progress.update(download_task, download_bar_title, total=final_count)
345+
progress.update(
346+
download_task, download_bar_title, total=final_count
347+
)
321348
progress.start_task(task_id=download_task, task_type="Download")
322349

323350
if progress:
324351
progress.update(
325-
progress_task, progress_bar_title,
326-
completed=self.current_status.files_completed
352+
progress_task,
353+
progress_bar_title,
354+
completed=self.current_status.files_completed,
327355
)
328356

329357
if self.current_status.status == Status.complete:
@@ -339,7 +367,9 @@ async def retrieve_current_transform_status(self):
339367
# Is this the first time we've polled status? We now know the request ID.
340368
# Update the display and set our download directory.
341369
if not self.current_status:
342-
rich.print(f"[bold]ServiceX Transform {s.title}: {s.request_id}[/bold]")
370+
logging.getLogger(__name__).info(
371+
f"ServiceX Transform {s.title}: {s.request_id}"
372+
)
343373
self.download_path = self.cache.cache_path_for_transform(s)
344374

345375
self.current_status = s
@@ -434,35 +464,39 @@ async def get_signed_url(
434464
await asyncio.gather(*download_tasks)
435465
return result_uris
436466

437-
async def as_files_async(self,
438-
display_progress: bool = True,
439-
provided_progress: Optional[ProgressIndicators] = None
440-
) -> TransformedResults:
467+
async def as_files_async(
468+
self,
469+
display_progress: bool = True,
470+
provided_progress: Optional[ProgressIndicators] = None,
471+
) -> TransformedResults:
441472
r"""
442473
Submit the transform and request all the resulting files to be downloaded
443474
:return: TransformResult instance with the list of complete paths to the downloaded files
444475
"""
445476
with ExpandableProgress(display_progress, provided_progress) as progress:
446-
return await self.submit_and_download(signed_urls_only=False,
447-
expandable_progress=progress)
477+
return await self.submit_and_download(
478+
signed_urls_only=False, expandable_progress=progress
479+
)
448480

449481
as_files = make_sync(as_files_async)
450482

451483
try:
452-
async def as_pandas_async(self,
453-
display_progress: bool = True,
454-
provided_progress: Optional[ProgressIndicators] = None) \
455-
-> pd.DataFrame:
484+
485+
async def as_pandas_async(
486+
self,
487+
display_progress: bool = True,
488+
provided_progress: Optional[ProgressIndicators] = None,
489+
) -> pd.DataFrame:
456490
r"""
457491
Return a pandas dataframe containing the results. This only works if you've
458492
installed pandas extra
459493
460494
:return: Pandas Dataframe
461495
"""
462496
self.result_format = ResultFormat.parquet
463-
transformed_result = await self.as_files_async(display_progress=display_progress,
464-
provided_progress=provided_progress
465-
)
497+
transformed_result = await self.as_files_async(
498+
display_progress=display_progress, provided_progress=provided_progress
499+
)
466500
dataframes = pd.concat(
467501
[pd.read_parquet(p) for p in transformed_result.file_list]
468502
)
@@ -472,24 +506,31 @@ async def as_pandas_async(self,
472506
except NameError:
473507
pass
474508

475-
async def as_signed_urls_async(self, display_progress: bool = True,
476-
provided_progress: Optional[ProgressIndicators] = None,
477-
dataset_group: bool = False) \
478-
-> TransformedResults:
509+
async def as_signed_urls_async(
510+
self,
511+
display_progress: bool = True,
512+
provided_progress: Optional[ProgressIndicators] = None,
513+
dataset_group: bool = False,
514+
) -> TransformedResults:
479515
r"""
480516
Presign URLs for each of the transformed files
481517
482518
:return: TransformedResults object with the presigned_urls list populated
483519
"""
484520
if dataset_group:
485-
return await self.submit_and_download(signed_urls_only=True,
486-
expandable_progress=provided_progress,
487-
dataset_group=dataset_group)
488-
489-
with ExpandableProgress(display_progress=display_progress,
490-
provided_progress=provided_progress) as progress:
491-
return await self.submit_and_download(signed_urls_only=True,
492-
expandable_progress=progress,
493-
dataset_group=dataset_group)
521+
return await self.submit_and_download(
522+
signed_urls_only=True,
523+
expandable_progress=provided_progress,
524+
dataset_group=dataset_group,
525+
)
526+
527+
with ExpandableProgress(
528+
display_progress=display_progress, provided_progress=provided_progress
529+
) as progress:
530+
return await self.submit_and_download(
531+
signed_urls_only=True,
532+
expandable_progress=progress,
533+
dataset_group=dataset_group,
534+
)
494535

495536
as_signed_urls = make_sync(as_signed_urls_async)

servicex/servicex_client.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,19 @@ def get_codegen(_sample: Sample, _general: General):
5858
for sample in config.Sample:
5959
if sample.Query:
6060
if type(sample.Query) is str:
61-
qastle_query = qastle.python_ast_to_text_ast(ast.parse(sample.Query)) # NOQA E501
61+
qastle_query = qastle.python_ast_to_text_ast(
62+
ast.parse(sample.Query)
63+
) # NOQA E501
6264
sample.Query = FuncADLQuery()
6365

6466
sample.Query.set_provided_qastle(qastle_query)
6567

66-
query = sx.func_adl_dataset(sample.dataset_identifier, sample.Name,
67-
get_codegen(sample, config.General),
68-
config.General.OutputFormat)
68+
query = sx.func_adl_dataset(
69+
sample.dataset_identifier,
70+
sample.Name,
71+
get_codegen(sample, config.General),
72+
config.General.OutputFormat,
73+
)
6974
query._q_ast = sample.Query._q_ast
7075
query._item_type = sample.Query._item_type
7176
if sample.Tree:
@@ -86,9 +91,12 @@ def get_codegen(_sample: Sample, _general: General):
8691
except SyntaxError as e:
8792
raise SyntaxError(f"Syntax error in {sample.Name}: {e}")
8893

89-
dataset = sx.python_dataset(sample.dataset_identifier, sample.Name,
90-
get_codegen(sample, config.General),
91-
config.General.OutputFormat)
94+
dataset = sx.python_dataset(
95+
sample.dataset_identifier,
96+
sample.Name,
97+
get_codegen(sample, config.General),
98+
config.General.OutputFormat,
99+
)
92100
dataset.python_function = sample.Function
93101
dataset.ignore_cache = sample.IgnoreLocalCache
94102
datasets.append(dataset)
@@ -173,7 +181,7 @@ def get_code_generators(self, backend=None):
173181
if backend:
174182
cached_backends = self.query_cache.get_codegen_by_backend(backend)
175183
if cached_backends:
176-
rich.print("Returning code generators from cache")
184+
logging.getLogger(__name__).info("Returning code generators from cache")
177185
return cached_backends["codegens"]
178186
else:
179187
code_generators = self.servicex.get_code_generators()
@@ -187,7 +195,7 @@ def func_adl_dataset(
187195
codegen: str = "uproot",
188196
result_format: Optional[ResultFormat] = None,
189197
item_type: Type[T] = Any,
190-
ignore_cache: bool = False
198+
ignore_cache: bool = False,
191199
) -> FuncADLQuery[T]:
192200
r"""
193201
Generate a dataset that can use func_adl query language
@@ -217,7 +225,7 @@ def func_adl_dataset(
217225
query_cache=self.query_cache,
218226
result_format=result_format,
219227
item_type=item_type,
220-
ignore_cache=ignore_cache
228+
ignore_cache=ignore_cache,
221229
)
222230

223231
def python_dataset(
@@ -226,7 +234,7 @@ def python_dataset(
226234
title: str = "ServiceX Client",
227235
codegen: str = "uproot",
228236
result_format: Optional[ResultFormat] = None,
229-
ignore_cache: bool = False
237+
ignore_cache: bool = False,
230238
) -> PythonQuery:
231239
r"""
232240
Generate a dataset that can use accept a python function for the query
@@ -256,5 +264,5 @@ def python_dataset(
256264
config=self.config,
257265
query_cache=self.query_cache,
258266
result_format=result_format,
259-
ignore_cache=ignore_cache
267+
ignore_cache=ignore_cache,
260268
)

0 commit comments

Comments
 (0)