Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions gnomad/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from hailtop.aiogoogle import GoogleStorageAsyncFS
from hailtop.utils import bounded_gather, tqdm

from gnomad.resources.resource_utils import DataException

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -105,18 +107,71 @@ def file_exists(fname: str) -> bool:
Check whether a file exists.

Supports either local or Google cloud (gs://) paths.
If the file is a Hail file (.ht, .mt extensions), it checks that _SUCCESS is present.
If the file is a Hail file (.ht, .mt, .bm, .parquet, and .vds extensions), it checks that _SUCCESS is present.

:param fname: File name
:return: Whether the file exists
:param fname: File name.
:return: Whether the file exists.
"""
fext = os.path.splitext(fname)[1]
if fext in [".ht", ".mt"]:
fname += "/_SUCCESS"
if fext in {".ht", ".mt", ".bm", ".parquet"}:
paths = [f"{fname}/_SUCCESS"]

if fext == ".vds":
paths = [f"{fname}/reference_data/_SUCCESS", f"{fname}/variant_data/_SUCCESS"]

if fname.startswith("gs://"):
return hl.hadoop_exists(fname)
exists_func = hl.hadoop_exists
else:
return os.path.isfile(fname)
exists_func = os.path.isfile

exists = all([exists_func(p) for p in paths])

return exists


def check_file_exists_raise_error(
fname: Union[str, List[str]],
error_if_exists: bool = False,
error_if_not_exists: bool = False,
error_if_exists_msg: str = "The following files already exist: ",
error_if_not_exists_msg: str = "The following files do not exist: ",
) -> bool:
"""
Check whether the file or all files in a list of files exist and optionally raise an exception.

This can be useful when writing out to files at the end of a pipeline to first check if the file already
exists and therefore requires the file to be removed or overwrite specified so the pipeline doesn't fail.

:param fname: File path, or list of file paths to check the existence of.
:param error_if_exists: Whether to raise an exception if any of the files exist. Default is True.
:param error_if_not_exists: Whether to raise an exception if any of the files do not exist. Default is False.
:param error_if_exists_msg: String of the error message to print if any of the files exist.
:param error_if_not_exists_msg: String of the error message to print if any of the files do not exist.
:return: Boolean indicating if `fname` or all files in `fname` exist.
"""
if isinstance(fname, str):
fname = [fname]

all_exist = True
exist = []
not_exist = []
for f in fname:
exists = file_exists(f)
all_exist &= exists
if exists and error_if_exists:
exist.append(f)
if not exists and error_if_not_exists:
not_exist.append(f)

error_msg = ""
if exist:
error_msg = error_if_exists_msg + ", ".join(exist)
if not_exist:
error_msg = error_msg + "\n" + error_if_not_exists_msg + ", ".join(not_exist)
if error_msg:
raise DataException(error_msg)

return all_exist


def write_temp_gcs(
Expand Down