Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/hats/io/file_io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
read_parquet_file_to_pandas,
read_parquet_metadata,
remove_directory,
unnest_headers_for_pandas,
write_dataframe_to_csv,
write_dataframe_to_parquet,
write_fits_image,
Expand Down
52 changes: 23 additions & 29 deletions src/hats/io/file_io/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import tempfile
from collections.abc import Generator
from io import BytesIO
from pathlib import Path

import nested_pandas as npd
Expand All @@ -10,7 +11,6 @@
import pyarrow.dataset as pds
import pyarrow.parquet as pq
import upath.implementations.http
import yaml
from cdshealpix.skymap.skymap import Skymap
from pyarrow.dataset import Dataset
from upath import UPath
Expand Down Expand Up @@ -148,6 +148,15 @@ def write_dataframe_to_parquet(dataframe: pd.DataFrame, file_pointer):
dataframe.to_parquet(file_pointer.path, filesystem=file_pointer.fs)


def _parquet_precache_all_bytes(file_pointer): # pragma: no cover
if not isinstance(file_pointer, upath.implementations.http.HTTPPath):
return False
cache_options = file_pointer.fs.cache_options or {}
if "parquet_precache_all_bytes" not in cache_options:
return False
return cache_options["parquet_precache_all_bytes"]


def read_parquet_metadata(file_pointer: str | Path | UPath, **kwargs) -> pq.FileMetaData:
"""Read FileMetaData from footer of a single Parquet file.

Expand All @@ -158,8 +167,10 @@ def read_parquet_metadata(file_pointer: str | Path | UPath, **kwargs) -> pq.File
file_pointer = get_upath(file_pointer)
if file_pointer is None or not file_pointer.exists():
raise FileNotFoundError("Parquet file does not exist")
parquet_file = pq.read_metadata(file_pointer.path, filesystem=file_pointer.fs, **kwargs)
return parquet_file
if _parquet_precache_all_bytes(file_pointer): # pragma: no cover
return pq.read_metadata(BytesIO(file_pointer.read_bytes()), **kwargs)

return pq.read_metadata(file_pointer.path, filesystem=file_pointer.fs, **kwargs)


def read_parquet_file(file_pointer: str | Path | UPath, **kwargs) -> pq.ParquetFile:
Expand All @@ -172,6 +183,10 @@ def read_parquet_file(file_pointer: str | Path | UPath, **kwargs) -> pq.ParquetF
file_pointer = get_upath(file_pointer)
if file_pointer is None or not file_pointer.exists():
raise FileNotFoundError("Parquet file does not exist")

if _parquet_precache_all_bytes(file_pointer): # pragma: no cover
return pq.ParquetFile(BytesIO(file_pointer.read_bytes()), **kwargs)

return pq.ParquetFile(file_pointer.path, filesystem=file_pointer.fs, **kwargs)


Expand Down Expand Up @@ -262,18 +277,6 @@ def write_fits_image(histogram: np.ndarray, map_file_pointer: str | Path | UPath
_map_file.write(_tmp_file.read())


def read_yaml(file_handle: str | Path | UPath):
"""Reads yaml file from filesystem.

Args:
file_handle: location of yaml file
"""
file_handle = get_upath(file_handle)
with file_handle.open("r", encoding="utf-8") as _file:
metadata = yaml.safe_load(_file)
return metadata


def delete_file(file_handle: str | Path | UPath):
"""Deletes file from filesystem.

Expand All @@ -284,19 +287,6 @@ def delete_file(file_handle: str | Path | UPath):
file_handle.unlink()


def unnest_headers_for_pandas(storage_options: dict | None) -> dict | None:
"""Handle storage options for pandas read/write methods.
This is needed because fsspec http storage options are nested under the "headers" key,
see https://github.com/astronomy-commons/hipscat/issues/295
"""
if storage_options is not None and "headers" in storage_options:
# Copy the storage options to avoid modifying the original dictionary
storage_options_copy = storage_options.copy()
headers = storage_options_copy.pop("headers")
return {**storage_options_copy, **headers}
return storage_options


def read_parquet_file_to_pandas(file_pointer: str | Path | UPath, **kwargs) -> npd.NestedFrame:
"""Reads parquet file(s) to a pandas DataFrame

Expand All @@ -310,14 +300,18 @@ def read_parquet_file_to_pandas(file_pointer: str | Path | UPath, **kwargs) -> n
file_pointer = get_upath(file_pointer)
# If we are trying to read a directory over http, we need to send the explicit list of files instead.
# We don't want to get the list unnecessarily because it can be expensive.
if isinstance(file_pointer, upath.implementations.http.HTTPPath) and file_pointer.is_dir():
if (
isinstance(file_pointer, upath.implementations.http.HTTPPath) and file_pointer.is_dir()
): # pragma: no cover
file_pointers = [f for f in file_pointer.iterdir() if f.is_file()]
return npd.read_parquet(
file_pointers,
filesystem=file_pointer.fs,
partitioning=None, # Avoid the ArrowTypeError described in #367
**kwargs,
)
if _parquet_precache_all_bytes(file_pointer): # pragma: no cover
return npd.read_parquet(BytesIO(file_pointer.read_bytes()), partitioning=None, **kwargs)
return npd.read_parquet(
file_pointer.path,
filesystem=file_pointer.fs,
Expand Down
25 changes: 0 additions & 25 deletions tests/hats/io/file_io/test_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
read_parquet_dataset,
read_parquet_file_to_pandas,
remove_directory,
unnest_headers_for_pandas,
write_dataframe_to_csv,
write_fits_image,
write_string_to_file,
Expand Down Expand Up @@ -160,27 +159,3 @@ def test_write_point_map_roundtrip(small_sky_order1_dir, tmp_path):
write_fits_image(expected_counts_skymap, output_map_pointer)
counts_skymap = read_fits_image(output_map_pointer)
np.testing.assert_array_equal(counts_skymap, expected_counts_skymap)


def test_unnest_headers_for_pandas():
storage_options = {
"headers": {"Authorization": "Bearer my_token"},
}
storage_options_str = {"Authorization": "Bearer my_token"}
assert storage_options_str == unnest_headers_for_pandas(storage_options)

storage_options = {
"key1": "value1",
"headers": {"Authorization": "Bearer my_token", "Content": "X"},
}
storage_options_str = {"key1": "value1", "Authorization": "Bearer my_token", "Content": "X"}
assert storage_options_str == unnest_headers_for_pandas(storage_options)

storage_options = {
"key1": "value1",
"key2": None,
}
storage_options_str = {"key1": "value1", "key2": None}
assert storage_options_str == unnest_headers_for_pandas(storage_options)

assert None is unnest_headers_for_pandas(None)
Loading