Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 1 addition & 87 deletions gnomad/utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
# noqa: D100

import asyncio
import base64
import gzip
import logging
import os
import subprocess
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

import hail as hl
from hailtop.aiocloud.aiogoogle import GoogleStorageAsyncFS
from hailtop.aiotools import AsyncFS, LocalAsyncFS
from hailtop.aiotools.router_fs import RouterAsyncFS
from hailtop.utils import bounded_gather
from hailtop.utils.rich_progress_bar import SimpleCopyToolProgressBar

from gnomad.resources.resource_utils import DataException

Expand All @@ -24,85 +17,6 @@
logger.setLevel(logging.INFO)


async def parallel_file_exists_async(
fpaths: List[str], parallelism: int = 750
) -> Dict[str, bool]:
"""
Check whether a large number of files exist.

Created for use with hail Batch jobs.
Normal `file_exists` function is very slow when checking a large number of files.

:param fpaths: List of file paths to check. Files can be in local or Google cloud storage.
:param parallelism: Integer that sets parallelism of file existence checking task. Default is 750.
:return: Dictionary of file paths (str) and whether the file exists (boolean).
"""

async def async_file_exists(fs: AsyncFS, fpath: str) -> bool:
"""
Determine file existence.

:param fs: AsyncFS object.
:param fpath: Path to file to check.
:return: Whether file exists.
"""
fext = os.path.splitext(fpath)[1]
if fext in [".ht", ".mt"]:
fpath += "/_SUCCESS"
try:
await fs.statfile(fpath)
except FileNotFoundError:
return False
else:
return True

with SimpleCopyToolProgressBar(
total=len(fpaths), description="check files for existence", disable=False
) as pbar:
with ThreadPoolExecutor() as thread_pool:
async with RouterAsyncFS(
filesystems=[LocalAsyncFS(thread_pool), GoogleStorageAsyncFS()]
) as fs:

def check_existence_and_update_pbar_thunk(fpath: str) -> Callable:
"""
Create function to check if file exists and update progress bar in stdout.

Function delays coroutine creation to avoid creating too many live coroutines.

:param fpath: Path to file to check.
:return: Function that checks for file existence and updates progress bar.
"""

async def unapplied_function():
x = await async_file_exists(fs, fpath)
pbar.update(1)
return x

return unapplied_function

file_existence_checks = [
check_existence_and_update_pbar_thunk(fpath) for fpath in fpaths
]
file_existence = await bounded_gather(
*file_existence_checks, parallelism=parallelism
)
return dict(zip(fpaths, file_existence))


def parallel_file_exists(fpaths: List[str], parallelism: int = 750) -> Dict[str, bool]:
"""
Call `parallel_file_exists_async` to check whether large number of files exist.

:param fpaths: List of file paths to check. Files can be in local or Google cloud storage.
:param parallelism: Integer that sets parallelism of file existence checking task. Default is 750.
:return: Dictionary of file paths (str) and whether the file exists (boolean).
"""
return asyncio.get_event_loop().run_until_complete(
parallel_file_exists_async(fpaths, parallelism)
)


def file_exists(fname: str) -> bool:
"""
Check whether a file exists.
Expand Down