Skip to content
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ test = [
"numpy>=1.21",
"pyarrow>=8.0.0",
"pandas",
"miniopy-async==1.21.1"
"miniopy-async==1.21.1",
"scikit-hep-testdata",
]

docs = [
Expand Down
3 changes: 2 additions & 1 deletion servicex_analysis_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from .materialization import to_awk
from .file_peeking import get_structure
from .read_buffers import get_necessary_branches

__version__ = "1.0.2"
__all__ = ["to_awk", "get_structure"]
__all__ = ["to_awk", "get_structure", "get_necessary_branches"]
59 changes: 47 additions & 12 deletions servicex_analysis_utils/file_peeking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import numpy as np
import awkward as ak
import json
import logging


def run_query(input_filenames):
Expand Down Expand Up @@ -123,6 +124,30 @@ def build_deliver_spec(datasets):
return spec_python


def open_delivered_file(sample, path):
"""
Opens the first file delivered by ServiceX for a given sample and returns the
structure encoded in the "servicex/branch" branch.
If no files are found, logs a warning and returns None.
Parameters:
sample (str): The sample name for which to open the file.
path (list): List of file paths delivered by ServiceX for the sample.
"""

if not path:
logging.warning(
f"Warning: No files found for sample '{sample}' in delivered results. Skipping."
)
return None

try:
with uproot.open(path[0]) as f:
return f["servicex"]["branch"].array()[0]
except Exception as e:
logging.error(f"Error opening file for sample '{sample}': {e}")
return None


def print_structure_from_str(
deliver_dict, filter_branch="", save_to_txt=False, do_print=False
):
Expand All @@ -147,16 +172,18 @@ def print_structure_from_str(
)

for sample_name, path in deliver_dict.items():
structure_str = open_delivered_file(sample_name, path)
if structure_str is None:
continue
# Parse the JSON string into a dictionary
structure_dict = json.loads(structure_str)

output_lines.append(
f"\n---------------------------\n"
f"\U0001f4c1 Sample: {sample_name}\n"
f"---------------------------"
)

with uproot.open(path[0]) as f:
json_str = f["servicex"]["branch"].array()[0]
structure_dict = json.loads(json_str)

for tree_name, branches in structure_dict.items():
output_lines.append(f"\n\U0001f333 Tree: {tree_name}")
output_lines.append(" ├── Branches:")
Expand Down Expand Up @@ -212,7 +239,7 @@ def parse_jagged_depth_and_dtype(dtype_str):
return depth, None


def str_to_array(encoded_json_str):
def str_to_array(encoded_json_str, tree_forms=False):
"""
Helper to reconstruct ak.Arrays from a JSON-formatted file-structure string.
Returns an array mimicking TTrees and TBranches with correct field names and dtypes.
Expand All @@ -223,7 +250,7 @@ def str_to_array(encoded_json_str):
Returns:
ak.Array: An array containing a dictionary of trees with branch structures and dummy typed values.
"""
reconstructed_data = {}
reconstructed_file_data = {}
structure_dict = json.loads(encoded_json_str)

for treename, branch_dict in structure_dict.items():
Expand All @@ -249,9 +276,17 @@ def str_to_array(encoded_json_str):
branches[branch_name] = ak.Array([dummy])

if branches:
reconstructed_data[treename] = ak.Array([branches])
reconstructed_file_data[treename] = ak.Array([branches])

return ak.Array(reconstructed_data).type
if tree_forms:
# Dictionary of each tree's RecordForm
tree_forms = {
tree: arr.layout.form for tree, arr in reconstructed_file_data.items()
}
return tree_forms
else:
# ak.Array type with all trees and branches
return ak.Array(reconstructed_file_data).type


def get_structure(datasets, array_out=False, **kwargs):
Expand All @@ -270,13 +305,13 @@ def get_structure(datasets, array_out=False, **kwargs):
output = deliver(spec_python)

if array_out == True:
all_arrays = {}
all_requests = {}
for sample, path in output.items():
with uproot.open(path[0]) as f:
structure_str = f["servicex"]["branch"].array()[0]
sample_array = str_to_array(structure_str)
all_arrays[sample] = sample_array
return all_arrays
ak_structure = str_to_array(structure_str, **kwargs)
all_requests[sample] = ak_structure
return all_requests

else:
return print_structure_from_str(output, **kwargs)
109 changes: 109 additions & 0 deletions servicex_analysis_utils/read_buffers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright (c) 2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import awkward as ak
from awkward.forms import RecordForm


def add_keys(original_form):
"""
Add a form_key to the parent Record and to every child ListOffset/content associated to a field.

Parameters:
original_form (ak.forms.Form): Form of the awkward array the typetracer will be built upon.
This should be a RecordForm, which contains fields and contents.

Returns:
RecordForm:
"""

# RecordForm: zip contents + field names, recurse on each child,
# then rebuild with each child's form_key set to its field name
if isinstance(original_form, RecordForm):
new_contents = []
for child, name in zip(original_form.contents, original_form.fields):
child = child.copy(form_key=name)
new_contents.append(child)
return RecordForm(
new_contents,
original_form.fields,
form_key="RecordForm_key", # Give name to parent RecordForm
parameters=original_form.parameters,
)

else:
raise ValueError(
f"Unsupported form type: {type(original_form)}. "
"This function only supports RecordForm."
)


def is_branch_buffer(form_key, attribute, form):
# rename any node that has no form_key
if form_key is None:
return "Not-a-branch"
return f"{form_key}"


def build_typetracer_with_report(array):
"""
Build a typetracer from an awkward form, adding keys to the RecordForm and ListOffsetForm.

Parameters:
form (ak.HighLevel.Array): The awkward array the typetracer will be built upon.

Returns:
tracer, report: ak.typetracer.TypeTracer: the typetracer built from the array form and the report.
"""
# Get the form from the array
form = array.layout.form
stamped_form = add_keys(form)

tracer, report = ak.typetracer.typetracer_with_report(
stamped_form,
buffer_key=is_branch_buffer,
highlevel=True,
behavior=None,
attrs=None,
)
return tracer, report


def get_necessary_branches(report):
"""
Utility function to get the necessary branches from a typetracer report.

Parameters:
report (ak.typetracer.Report): Report from the typetracer.

Returns:
list: List of necessary branches.
"""
return [
br for br in report.data_touched if br != "Not-a-branch"
] # filter out "Not-a-branch"
133 changes: 133 additions & 0 deletions tests/test_typetracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import awkward as ak
from awkward.forms import ListOffsetForm, NumpyForm
import pytest
from servicex_analysis_utils import read_buffers
from skhep_testdata import data_path
import uproot


@pytest.fixture
def setup_form():
# Create a simple awkward array with a RecordForm
simple_array = ak.Array([{"x": [1, 2, 3], "y": [4, 5]}])

# Build a form from a .root file
file = data_path("uproot-Zmumu.root") + ":events"
uproot_array = uproot.open(file).arrays(library="ak")
return simple_array, uproot_array


@pytest.fixture
def setup_type_tracer(setup_form, from_uproot):
array_form, uproot_form = setup_form
if from_uproot:
form = uproot_form
else:
form = array_form
# Build a typetracer with the form
tracer, report = read_buffers.build_typetracer_with_report(form)
return tracer, report


@pytest.mark.parametrize("from_uproot", [True, False])
def test_built_typetracer_instance(setup_type_tracer):
tracer, report = setup_type_tracer
# Check if the tracer is an instance of high-level Array
assert isinstance(
tracer, ak.highlevel.Array
), f"Tracer should be a highlevel Array but is {type(tracer)}"

# Check if the report is an instance of Report
assert isinstance(report, ak.typetracer.TypeTracerReport), "Report is not a Report"
# Check if the report has data_touched
assert hasattr(
report, "data_touched"
), "Report does not have data_touched attribute"


@pytest.mark.parametrize("from_uproot", [False])
def test_simple_record_typetracer(setup_type_tracer):
tracer, report = setup_type_tracer

assert tracer.fields == ["x", "y"], "Fields of the tracer do not match expected"

# “Touch” one of the two fields
_ = tracer["x"] + 0

# Collect the touched branches
touched_branches = read_buffers.get_necessary_branches(report)
assert set(touched_branches) == {"x"}


@pytest.mark.parametrize("from_uproot", [True])
def test_root_record_typetracer(setup_type_tracer):
tracer, report = setup_type_tracer
expected_fields = [
"Type",
"Run",
"Event",
"E1",
"px1",
"py1",
"pz1",
"pt1",
"eta1",
"phi1",
"Q1",
"E2",
"px2",
"py2",
"pz2",
"pt2",
"eta2",
"phi2",
"Q2",
"M",
]

assert (
tracer.fields == expected_fields
), "Fields of the tracer do not match expected"

# Compute deltaR on typetracer
delta_r = (
(tracer["eta1"] - tracer["eta2"]) ** 2 + (tracer["phi1"] - tracer["phi2"]) ** 2
) ** 0.5

# Check delta_r is still an array
assert isinstance(
delta_r, ak.highlevel.Array
), f"delta_r should be a highlevel Array but is {type(delta_r)}"

# Check no data is loaded in computation
# only TypeTracer placeholders should be present
first_element = delta_r[0]
assert "TypeTracer" in repr(
first_element
), f"Expected a TypeTracer placeholder, got {repr(first_element)} instead."

# Collect the touched branches
touched_branches = read_buffers.get_necessary_branches(report)
assert set(touched_branches) == {
"eta1",
"phi1",
"eta2",
"phi2",
}, "Touched branches do not match expected branches"


def test_error_on_wrong_form():
# Test that ValueError is raised when an empty form is passed
with pytest.raises(
ValueError,
match="Unsupported form type: <class 'NoneType'>. This function only supports RecordForm.",
):
read_buffers.add_keys(None)

# Test that ValueError is raised when another form type is passed
dummy = ListOffsetForm("i64", NumpyForm("int32"))
with pytest.raises(
ValueError,
match="Unsupported form type: <class 'awkward.forms.listoffsetform.ListOffsetForm'>. This function only supports RecordForm.",
):
read_buffers.add_keys(dummy)