diff --git a/pyproject.toml b/pyproject.toml index 25785f5..adf3e63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ test = [ "numpy>=1.21", "pyarrow>=8.0.0", "pandas", - "miniopy-async==1.21.1" + "miniopy-async==1.21.1", + "scikit-hep-testdata", ] docs = [ diff --git a/servicex_analysis_utils/__init__.py b/servicex_analysis_utils/__init__.py index 8eae057..04b2bd1 100644 --- a/servicex_analysis_utils/__init__.py +++ b/servicex_analysis_utils/__init__.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from .materialization import to_awk from .file_peeking import get_structure +from .read_buffers import get_necessary_branches __version__ = "1.0.2" -__all__ = ["to_awk", "get_structure"] +__all__ = ["to_awk", "get_structure", "get_necessary_branches"] diff --git a/servicex_analysis_utils/file_peeking.py b/servicex_analysis_utils/file_peeking.py index 88a056e..4b3c6e8 100644 --- a/servicex_analysis_utils/file_peeking.py +++ b/servicex_analysis_utils/file_peeking.py @@ -31,6 +31,7 @@ import numpy as np import awkward as ak import json +import logging def run_query(input_filenames): @@ -123,6 +124,30 @@ def build_deliver_spec(datasets): return spec_python +def open_delivered_file(sample, path): + """ + Opens the first file delivered by ServiceX for a given sample and returns the + structure encoded in the "servicex/branch" branch. + If no files are found, logs a warning and returns None. + Parameters: + sample (str): The sample name for which to open the file. + path (list): List of file paths delivered by ServiceX for the sample. + """ + + if not path: + logging.warning( + f"Warning: No files found for sample '{sample}' in delivered results. Skipping." + ) + return None + + try: + with uproot.open(path[0]) as f: + return f["servicex"]["branch"].array()[0] + except Exception as e: + logging.error(f"Error opening file for sample '{sample}': {e}") + return None + + def print_structure_from_str( deliver_dict, filter_branch="", save_to_txt=False, do_print=False ): @@ -147,16 +172,18 @@ def print_structure_from_str( ) for sample_name, path in deliver_dict.items(): + structure_str = open_delivered_file(sample_name, path) + if structure_str is None: + continue + # Parse the JSON string into a dictionary + structure_dict = json.loads(structure_str) + output_lines.append( f"\n---------------------------\n" f"\U0001f4c1 Sample: {sample_name}\n" f"---------------------------" ) - with uproot.open(path[0]) as f: - json_str = f["servicex"]["branch"].array()[0] - structure_dict = json.loads(json_str) - for tree_name, branches in structure_dict.items(): output_lines.append(f"\n\U0001f333 Tree: {tree_name}") output_lines.append(" ├── Branches:") @@ -212,7 +239,7 @@ def parse_jagged_depth_and_dtype(dtype_str): return depth, None -def str_to_array(encoded_json_str): +def str_to_array(encoded_json_str, tree_forms=False): """ Helper to reconstruct ak.Arrays from a JSON-formatted file-structure string. Returns an array mimicking TTrees and TBranches with correct field names and dtypes. @@ -223,7 +250,7 @@ def str_to_array(encoded_json_str): Returns: ak.Array: An array containing a dictionary of trees with branch structures and dummy typed values. """ - reconstructed_data = {} + reconstructed_file_data = {} structure_dict = json.loads(encoded_json_str) for treename, branch_dict in structure_dict.items(): @@ -249,9 +276,17 @@ def str_to_array(encoded_json_str): branches[branch_name] = ak.Array([dummy]) if branches: - reconstructed_data[treename] = ak.Array([branches]) + reconstructed_file_data[treename] = ak.Array([branches]) - return ak.Array(reconstructed_data).type + if tree_forms: + # Dictionary of each tree's RecordForm + tree_forms = { + tree: arr.layout.form for tree, arr in reconstructed_file_data.items() + } + return tree_forms + else: + # ak.Array type with all trees and branches + return ak.Array(reconstructed_file_data).type def get_structure(datasets, array_out=False, **kwargs): @@ -270,13 +305,13 @@ def get_structure(datasets, array_out=False, **kwargs): output = deliver(spec_python) if array_out == True: - all_arrays = {} + all_requests = {} for sample, path in output.items(): with uproot.open(path[0]) as f: structure_str = f["servicex"]["branch"].array()[0] - sample_array = str_to_array(structure_str) - all_arrays[sample] = sample_array - return all_arrays + ak_structure = str_to_array(structure_str, **kwargs) + all_requests[sample] = ak_structure + return all_requests else: return print_structure_from_str(output, **kwargs) diff --git a/servicex_analysis_utils/read_buffers.py b/servicex_analysis_utils/read_buffers.py new file mode 100644 index 0000000..da3a368 --- /dev/null +++ b/servicex_analysis_utils/read_buffers.py @@ -0,0 +1,109 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import awkward as ak +from awkward.forms import RecordForm + + +def add_keys(original_form): + """ + Add a form_key to the parent Record and to every child ListOffset/content associated to a field. + + Parameters: + original_form (ak.forms.Form): Form of the awkward array the typetracer will be built upon. + This should be a RecordForm, which contains fields and contents. + + Returns: + RecordForm: + """ + + # RecordForm: zip contents + field names, recurse on each child, + # then rebuild with each child's form_key set to its field name + if isinstance(original_form, RecordForm): + new_contents = [] + for child, name in zip(original_form.contents, original_form.fields): + child = child.copy(form_key=name) + new_contents.append(child) + return RecordForm( + new_contents, + original_form.fields, + form_key="RecordForm_key", # Give name to parent RecordForm + parameters=original_form.parameters, + ) + + else: + raise ValueError( + f"Unsupported form type: {type(original_form)}. " + "This function only supports RecordForm." + ) + + +def is_branch_buffer(form_key, attribute, form): + # rename any node that has no form_key + if form_key is None: + return "Not-a-branch" + return f"{form_key}" + + +def build_typetracer_with_report(array): + """ + Build a typetracer from an awkward form, adding keys to the RecordForm and ListOffsetForm. + + Parameters: + form (ak.HighLevel.Array): The awkward array the typetracer will be built upon. + + Returns: + tracer, report: ak.typetracer.TypeTracer: the typetracer built from the array form and the report. + """ + # Get the form from the array + form = array.layout.form + stamped_form = add_keys(form) + + tracer, report = ak.typetracer.typetracer_with_report( + stamped_form, + buffer_key=is_branch_buffer, + highlevel=True, + behavior=None, + attrs=None, + ) + return tracer, report + + +def get_necessary_branches(report): + """ + Utility function to get the necessary branches from a typetracer report. + + Parameters: + report (ak.typetracer.Report): Report from the typetracer. + + Returns: + list: List of necessary branches. + """ + return [ + br for br in report.data_touched if br != "Not-a-branch" + ] # filter out "Not-a-branch" diff --git a/tests/test_typetracer.py b/tests/test_typetracer.py new file mode 100644 index 0000000..a13d01c --- /dev/null +++ b/tests/test_typetracer.py @@ -0,0 +1,133 @@ +import awkward as ak +from awkward.forms import ListOffsetForm, NumpyForm +import pytest +from servicex_analysis_utils import read_buffers +from skhep_testdata import data_path +import uproot + + +@pytest.fixture +def setup_form(): + # Create a simple awkward array with a RecordForm + simple_array = ak.Array([{"x": [1, 2, 3], "y": [4, 5]}]) + + # Build a form from a .root file + file = data_path("uproot-Zmumu.root") + ":events" + uproot_array = uproot.open(file).arrays(library="ak") + return simple_array, uproot_array + + +@pytest.fixture +def setup_type_tracer(setup_form, from_uproot): + array_form, uproot_form = setup_form + if from_uproot: + form = uproot_form + else: + form = array_form + # Build a typetracer with the form + tracer, report = read_buffers.build_typetracer_with_report(form) + return tracer, report + + +@pytest.mark.parametrize("from_uproot", [True, False]) +def test_built_typetracer_instance(setup_type_tracer): + tracer, report = setup_type_tracer + # Check if the tracer is an instance of high-level Array + assert isinstance( + tracer, ak.highlevel.Array + ), f"Tracer should be a highlevel Array but is {type(tracer)}" + + # Check if the report is an instance of Report + assert isinstance(report, ak.typetracer.TypeTracerReport), "Report is not a Report" + # Check if the report has data_touched + assert hasattr( + report, "data_touched" + ), "Report does not have data_touched attribute" + + +@pytest.mark.parametrize("from_uproot", [False]) +def test_simple_record_typetracer(setup_type_tracer): + tracer, report = setup_type_tracer + + assert tracer.fields == ["x", "y"], "Fields of the tracer do not match expected" + + # “Touch” one of the two fields + _ = tracer["x"] + 0 + + # Collect the touched branches + touched_branches = read_buffers.get_necessary_branches(report) + assert set(touched_branches) == {"x"} + + +@pytest.mark.parametrize("from_uproot", [True]) +def test_root_record_typetracer(setup_type_tracer): + tracer, report = setup_type_tracer + expected_fields = [ + "Type", + "Run", + "Event", + "E1", + "px1", + "py1", + "pz1", + "pt1", + "eta1", + "phi1", + "Q1", + "E2", + "px2", + "py2", + "pz2", + "pt2", + "eta2", + "phi2", + "Q2", + "M", + ] + + assert ( + tracer.fields == expected_fields + ), "Fields of the tracer do not match expected" + + # Compute deltaR on typetracer + delta_r = ( + (tracer["eta1"] - tracer["eta2"]) ** 2 + (tracer["phi1"] - tracer["phi2"]) ** 2 + ) ** 0.5 + + # Check delta_r is still an array + assert isinstance( + delta_r, ak.highlevel.Array + ), f"delta_r should be a highlevel Array but is {type(delta_r)}" + + # Check no data is loaded in computation + # only TypeTracer placeholders should be present + first_element = delta_r[0] + assert "TypeTracer" in repr( + first_element + ), f"Expected a TypeTracer placeholder, got {repr(first_element)} instead." + + # Collect the touched branches + touched_branches = read_buffers.get_necessary_branches(report) + assert set(touched_branches) == { + "eta1", + "phi1", + "eta2", + "phi2", + }, "Touched branches do not match expected branches" + + +def test_error_on_wrong_form(): + # Test that ValueError is raised when an empty form is passed + with pytest.raises( + ValueError, + match="Unsupported form type: . This function only supports RecordForm.", + ): + read_buffers.add_keys(None) + + # Test that ValueError is raised when another form type is passed + dummy = ListOffsetForm("i64", NumpyForm("int32")) + with pytest.raises( + ValueError, + match="Unsupported form type: . This function only supports RecordForm.", + ): + read_buffers.add_keys(dummy)