diff --git a/gnomad/resources/grch38/gnomad.py b/gnomad/resources/grch38/gnomad.py index a47fad8d0..305df7aef 100644 --- a/gnomad/resources/grch38/gnomad.py +++ b/gnomad/resources/grch38/gnomad.py @@ -1,7 +1,10 @@ # noqa: D100 +import logging from typing import Optional +import hail as hl + from gnomad.resources.resource_utils import ( DataException, GnomadPublicMatrixTableResource, @@ -9,6 +12,16 @@ VersionedMatrixTableResource, VersionedTableResource, ) +from gnomad.sample_qc.ancestry import POP_NAMES +from gnomad.utils.annotations import add_gks_va, add_gks_vrs +from gnomad.utils.vcf import FAF_POPS + +logging.basicConfig( + format="%(asctime)s (%(name)s %(lineno)s): %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", +) +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) CURRENT_EXOME_RELEASE = "" CURRENT_GENOME_RELEASE = "3.1.2" @@ -413,3 +426,163 @@ def release_vcf_path(data_type: str, version: str, contig: str) -> str: contig = f".{contig}" if contig else "" version_prefix = "r" if version.startswith("3.0") else "v" return f"gs://gcp-public-data--gnomad/release/{version}/vcf/{data_type}/gnomad.{data_type}.{version_prefix}{version}.sites{contig}.vcf.bgz" + + +def gnomad_gks( + locus_interval: hl.IntervalExpression, + version: str, + data_type: str = "genomes", + by_ancestry_group: bool = False, + by_sex: bool = False, + vrs_only: bool = False, + custom_ht: hl.Table = None, + skip_checkpoint: bool = False, + skip_coverage: bool = False, + custom_coverage_ht: hl.Table = None, +) -> list: + """ + Perform gnomad GKS annotations on a range of variants at once. + + :param locus_interval: Hail IntervalExpression of locus. + e.g. hl.locus_interval('chr1', 6424776, 6461367, reference_genome="GRCh38") + :param version: String of version of gnomAD release to use. + :param data_type: String of either "exomes" or "genomes" for the type of reads that are desired. + :param by_ancestry_group: Boolean to pass to obtain frequency information for each cohort. + :param by_sex: Boolean to pass to return frequency information for each cohort split by chromosomal sex. + :param vrs_only: Boolean to pass for only VRS info to be returned + (will not include allele frequency information). + :param custom_ht: Table to use instead of what public_release() method would return for the version. + :param skip_checkpoint: Bool to pass to skip checkpointing selected fields + (checkpointing may be desirable for large datasets by reducing data copies across the cluster). + :param skip_coverage: Bool to pass to skip adding coverage statistics. + :param custom_coverage_ht: Custom table to use for coverage statistics instead of the release coverage table. + :return: List of dictionaries containing VRS information + (and freq info split by ancestry groups and sex if desired) for specified variant. + """ + # Read public_release table if no custom table provided + if custom_ht: + ht = custom_ht + else: + ht = hl.read_table(public_release(data_type).versions[version].path) + + high_level_version = f"v{version.split('.')[0]}" + + # Read coverage statistics if requested + if high_level_version == "v3": + coverage_version = "3.0.1" + else: + raise NotImplementedError( + "gnomad_gks() is currently only implemented for gnomAD v3." + ) + + coverage_ht = None + + if not skip_coverage: + if custom_coverage_ht: + coverage_ht = custom_coverage_ht + else: + coverage_ht = hl.read_table( + coverage("genomes").versions[coverage_version].path + ) + ht = ht.annotate(mean_depth=coverage_ht[ht.locus].mean) + + # Retrieve ancestry groups from the imported POPS dictionary. + pops_list = list(POPS[high_level_version]) if by_ancestry_group else None + + # Throw warnings if contradictory arguments are passed. + if by_ancestry_group and vrs_only: + logger.warning( + "Both 'vrs_only' and 'by_ancestry_groups' have been specified. Ignoring" + " 'by_ancestry_groups' list and returning only VRS information." + ) + elif by_sex and not by_ancestry_group: + logger.warning( + "Splitting whole database by sex is not yet supported. If using 'by_sex'," + " please also specify 'by_ancestry_group' to stratify by." + ) + + # Call and return add_gks_vrs and add_gks_va for chosen arguments. + + # Select relevant fields, checkpoint, and filter to interval before adding + # annotations + ht = ht.annotate( + faf95=hl.rbind( + hl.sorted( + hl.array( + [ + hl.struct( + faf=ht.faf[ht.faf_index_dict[f"{pop}-adj"]].faf95, + population=pop, + ) + for pop in FAF_POPS + ] + ), + key=lambda f: (-f.faf, f.population), + ), + lambda fafs: hl.if_else( + hl.len(fafs) > 0, + hl.struct( + popmax=fafs[0].faf, + popmax_population=hl.if_else( + fafs[0].faf == 0, hl.missing(hl.tstr), fafs[0].population + ), + ), + hl.struct( + popmax=hl.missing(hl.tfloat), popmax_population=hl.missing(hl.tstr) + ), + ), + ) + ) + + keep_fields = [ht.freq, ht.info.vrs, ht.faf95] + + if not skip_coverage: + keep_fields.append(ht.mean_depth) + + ht = ht.select(*keep_fields) + + # Checkpoint narrower set of columns if not skipped. + ht = hl.filter_intervals(ht, [locus_interval]) + if not skip_checkpoint: + ht = ht.checkpoint(hl.utils.new_temp_file("vrs_checkpoint", extension="ht")) + + # Collect all variants as structs, so all dictionary construction can be + # done in native Python + variant_list = ht.collect() + ht_freq_index_dict = ht.freq_index_dict.collect()[0] + + # Assemble output dicts with VRS and optionally frequency, append to list, + # then return list + outputs = [] + for variant in variant_list: + vrs_variant = add_gks_vrs(variant.locus, variant.vrs) + + out = { + "locus": { + "contig": variant.locus.contig, + "position": variant.locus.position, + "reference_genome": variant.locus.reference_genome.name, + }, + "alleles": variant.alleles, + "gks_vrs_variant": vrs_variant, + } + + if not vrs_only: + va_freq_dict = add_gks_va( + input_struct=variant, + label_name="gnomAD", + label_version=version, + ancestry_groups=pops_list, + ancestry_groups_dict=POP_NAMES, + by_sex=by_sex, + freq_index_dict=ht_freq_index_dict, + ) + + # Assign existing VRS information to "focusAllele" key + va_freq_dict["focusAllele"] = vrs_variant + out["gks_va_freq"] = va_freq_dict + + # Append variant dictionary to list of outputs + outputs.append(out) + + return outputs diff --git a/gnomad/utils/annotations.py b/gnomad/utils/annotations.py index 85c3d59ba..4cd7b79d8 100644 --- a/gnomad/utils/annotations.py +++ b/gnomad/utils/annotations.py @@ -1,10 +1,16 @@ # noqa: D100 +import csv import itertools +import json import logging +from timeit import default_timer as timer from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +import ga4gh.core as ga4gh_core +import ga4gh.vrs as ga4gh_vrs import hail as hl +from hail.utils.misc import new_temp_file import gnomad.utils.filtering as filter_utils from gnomad.utils.gen_stats import to_phred @@ -34,6 +40,61 @@ "pab_max": (0, 1, 50), } +VRS_CHROM_IDS = { + "GRCh38": { + "chr1": "ga4gh:SQ.Ya6Rs7DHhDeg7YaOSg1EoNi3U_nQ9SvO", + "chr2": "ga4gh:SQ.pnAqCRBrTsUoBghSD1yp_jXWSmlbdh4g", + "chr3": "ga4gh:SQ.Zu7h9AggXxhTaGVsy7h_EZSChSZGcmgX", + "chr4": "ga4gh:SQ.HxuclGHh0XCDuF8x6yQrpHUBL7ZntAHc", + "chr5": "ga4gh:SQ.aUiQCzCPZ2d0csHbMSbh2NzInhonSXwI", + "chr6": "ga4gh:SQ.0iKlIQk2oZLoeOG9P1riRU6hvL5Ux8TV", + "chr7": "ga4gh:SQ.F-LrLMe1SRpfUZHkQmvkVKFEGaoDeHul", + "chr8": "ga4gh:SQ.209Z7zJ-mFypBEWLk4rNC6S_OxY5p7bs", + "chr9": "ga4gh:SQ.KEO-4XBcm1cxeo_DIQ8_ofqGUkp4iZhI", + "chr10": "ga4gh:SQ.ss8r_wB0-b9r44TQTMmVTI92884QvBiB", + "chr11": "ga4gh:SQ.2NkFm8HK88MqeNkCgj78KidCAXgnsfV1", + "chr12": "ga4gh:SQ.6wlJpONE3oNb4D69ULmEXhqyDZ4vwNfl", + "chr13": "ga4gh:SQ._0wi-qoDrvram155UmcSC-zA5ZK4fpLT", + "chr14": "ga4gh:SQ.eK4D2MosgK_ivBkgi6FVPg5UXs1bYESm", + "chr15": "ga4gh:SQ.AsXvWL1-2i5U_buw6_niVIxD6zTbAuS6", + "chr16": "ga4gh:SQ.yC_0RBj3fgBlvgyAuycbzdubtLxq-rE0", + "chr17": "ga4gh:SQ.dLZ15tNO1Ur0IcGjwc3Sdi_0A6Yf4zm7", + "chr18": "ga4gh:SQ.vWwFhJ5lQDMhh-czg06YtlWqu0lvFAZV", + "chr19": "ga4gh:SQ.IIB53T8CNeJJdUqzn9V_JnRtQadwWCbl", + "chr20": "ga4gh:SQ.-A1QmD_MatoqxvgVxBLZTONHz9-c7nQo", + "chr21": "ga4gh:SQ.5ZUqxCmDDgN4xTRbaSjN8LwgZironmB8", + "chr22": "ga4gh:SQ.7B7SHsmchAR0dFcDCuSFjJAo7tX87krQ", + "chrX": "ga4gh:SQ.w0WZEvgJF0zf_P4yyTzjjv9oW1z61HHP", + "chrY": "ga4gh:SQ.8_liLu1aycC0tPQPFmUaGXJLDs5SbPZ5", + }, + "GRCh37": { + "1": "ga4gh:SQ.S_KjnFVz-FE7M0W6yoaUDgYxLPc1jyWU", + "2": "ga4gh:SQ.9KdcA9ZpY1Cpvxvg8bMSLYDUpsX6GDLO", + "3": "ga4gh:SQ.VNBualIltAyi2AI_uXcKU7M9XUOuA7MS", + "4": "ga4gh:SQ.iy7Zfceb5_VGtTQzJ-v5JpPbpeifHD_V", + "5": "ga4gh:SQ.vbjOdMfHJvTjK_nqvFvpaSKhZillW0SX", + "6": "ga4gh:SQ.KqaUhJMW3CDjhoVtBetdEKT1n6hM-7Ek", + "7": "ga4gh:SQ.IW78mgV5Cqf6M24hy52hPjyyo5tCCd86", + "8": "ga4gh:SQ.tTm7wmhz0G4lpt8wPspcNkAD_qiminj6", + "9": "ga4gh:SQ.HBckYGQ4wYG9APHLpjoQ9UUe9v7NxExt", + "10": "ga4gh:SQ.-BOZ8Esn8J88qDwNiSEwUr5425UXdiGX", + "11": "ga4gh:SQ.XXi2_O1ly-CCOi3HP5TypAw7LtC6niFG", + "12": "ga4gh:SQ.105bBysLoDFQHhajooTAUyUkNiZ8LJEH", + "13": "ga4gh:SQ.Ewb9qlgTqN6e_XQiRVYpoUfZJHXeiUfH", + "14": "ga4gh:SQ.5Ji6FGEKfejK1U6BMScqrdKJK8GqmIGf", + "15": "ga4gh:SQ.zIMZb3Ft7RdWa5XYq0PxIlezLY2ccCgt", + "16": "ga4gh:SQ.W6wLoIFOn4G7cjopxPxYNk2lcEqhLQFb", + "17": "ga4gh:SQ.AjWXsI7AkTK35XW9pgd3UbjpC3MAevlz", + "18": "ga4gh:SQ.BTj4BDaaHYoPhD3oY2GdwC_l0uqZ92UD", + "19": "ga4gh:SQ.ItRDD47aMoioDCNW_occY5fWKZBKlxCX", + "20": "ga4gh:SQ.iy_UbUrvECxFRX5LPTH_KPojdlT7BKsf", + "21": "ga4gh:SQ.LpTaNW-hwuY_yARP0rtarCnpCQLkgVCg", + "22": "ga4gh:SQ.XOgHwwR3Upfp5sZYk6ZKzvV25a4RBVu8", + "X": "ga4gh:SQ.v7noePfnNpK8ghYXEqZ9NukMXW7YeNsm", + "Y": "ga4gh:SQ.BT7QyW5iXaX_1PSX-msSGYsqRdMKqkj-", + }, +} + def pop_max_expr( freq: hl.expr.ArrayExpression, @@ -904,8 +965,7 @@ def region_flag_expr( :return: `region_flag` struct row annotation """ prob_flags_expr = ( - {"non_par": (t.locus.in_x_nonpar() | t.locus.in_y_nonpar()) - } if non_par else {} # fmt: skip + {"non_par": t.locus.in_x_nonpar() | t.locus.in_y_nonpar()} if non_par else {} ) if prob_regions is not None: @@ -1868,3 +1928,299 @@ def _update_struct( ) return ht.annotate(**updated_rows) + + +def gks_compute_seqloc_digest( + ht: hl.Table, + export_tmpfile: str = new_temp_file("gks-seqloc-pre.tsv"), + computed_tmpfile: str = new_temp_file("gks-seqloc-post.tsv"), +): + """ + Compute sequence location digest-based id for a hail variant Table. + + Exports table to tsv, computes SequenceLocation digests, reimports and replaces + the vrs_json field with the result. Input table must have a .vrs field, like the + one added by add_gks_vrs, that can be used to construct ga4gh.vrs models. + + :param ht: hail table with VRS annotation + :param export_tmpfile: file path to export the table to. + :param computed_tmpfile: file path to write the updated rows to, + which is then imported as a hail table + :return: a hail table with the VRS annotation updated with the new SequenceLocations + """ + logger.info("Exporting ht to %s", export_tmpfile) + ht.select("vrs_json").export(export_tmpfile, header=True) + + logger.info( + "Computing SequenceLocation digests and writing to %s", computed_tmpfile + ) + start = timer() + counter = 0 + with open(computed_tmpfile, "w", encoding="utf-8") as f_out: + with open(export_tmpfile, "r", encoding="utf-8") as f: + reader = csv.reader(f, delimiter="\t") + header = None + for line in reader: + if header is None: + header = line + f_out.write("\t".join(header)) + f_out.write("\n") + continue + else: + locus, alleles, vrs_json = line + vrs_variant = json.loads(vrs_json) + location = vrs_variant["location"] + location.pop("_id") + location_id = ga4gh_core._internal.identifiers.ga4gh_identify( + ga4gh_vrs.models.SequenceLocation(**location) + ) + vrs_variant["location"]["_id"] = location_id + # serialize outputs to JSON and write to TSV + vrs_json = json.dumps(vrs_variant) + alleles = json.dumps(json.loads(alleles)) + f_out.write("\t".join([locus, alleles, vrs_json])) + f_out.write("\n") + counter += 1 + end = timer() + logger.info( + "Computed %s SequenceLocation digests in %s seconds", counter, (end - start) + ) + logger.info("Importing VRS records with computed SequenceLocation digests") + ht_with_location = hl.import_table( + computed_tmpfile, types={"locus": "tstr", "alleles": "tstr", "vrs_json": "tstr"} + ) + ht_with_location_parsed = ht_with_location.annotate( + locus=hl.locus( + contig=ht_with_location.locus.split(":")[0], + pos=hl.int32(ht_with_location.locus.split(":")[1]), + reference_genome="GRCh38", + ), + alleles=hl.parse_json(ht_with_location.alleles, dtype=hl.tarray(hl.tstr)), + ).key_by("locus", "alleles") + + return ht.drop("vrs_json").join(ht_with_location_parsed, how="left") + + +def add_gks_vrs( + input_locus: hl.locus, + input_vrs: hl.struct, +) -> dict: + """ + Generate a dictionary containing VRS information from a given locus and struct of VRS information. + + Dict will have GA4GH GKS VRS structure. + + :param input_locus: Locus field from a struct (locus of result of running .collect() on a Hail table). + :param input_vrs: VRS struct (such as from a ht.info.vrs field). + :return: Python dictionary conforming to GA4GH GKS VRS structure. + """ + build_in = input_locus.reference_genome.name + chr_in = input_locus.contig + + chrom_dict = VRS_CHROM_IDS[build_in] + vrs_id = input_vrs.VRS_Allele_IDs[1] + vrs_chrom_id = chrom_dict[chr_in] + vrs_start_value = input_vrs.VRS_Starts[1] + vrs_end_value = input_vrs.VRS_Ends[1] + vrs_state_sequence = input_vrs.VRS_States[1] + + vrs_dict_out = { + "_id": vrs_id, + "type": "Allele", + "location": { + "type": "SequenceLocation", + "sequence_id": vrs_chrom_id, + "interval": { + "start": {"type": "Number", "value": vrs_start_value}, + "end": {"type": "Number", "value": vrs_end_value}, + "type": "SequenceInterval", + }, + }, + "state": {"type": "LiteralSequenceExpression", "sequence": vrs_state_sequence}, + } + + location_id = ga4gh_core._internal.identifiers.ga4gh_identify( + ga4gh_vrs.models.SequenceLocation(**vrs_dict_out["location"]) + ) + + vrs_dict_out["location"]["_id"] = location_id + + return vrs_dict_out + + +def add_gks_va( + input_struct: hl.struct, + label_name: str = "gnomAD", + label_version: str = "3.1.2", + ancestry_groups: list = None, + ancestry_groups_dict: dict = None, + by_sex: bool = False, + freq_index_dict: dict = None, +) -> dict: + """ + Generate Python dictionary containing GKS VA annotations. + + Populate the dictionary with frequency information conforming to the GKS VA frequency schema. + If ancestry_groups or by_sex is provided, also include subcohort schemas for each cohort. + If input_struct has mean_depth, it is added to ancillaryResults. + This annotation is added under the gks_va_freq_dict field of the table. + The focusAllele field is not populated, and must be filled in by the caller. + + :param input_struct: Hail struct for a desired variant (such as result of running .collect()[0] on a Table). + :param label_name: Label name to use within the returned dictionary. Example: "gnomAD". + :param label_version: String listing the version of the table being used. Example: "3.1.2". + :param ancestry_groups: List of strings of shortened names of cohorts to return results for. + Example: ['afr','fin','nfe']. Default is None. + :param ancestry_groups_dict: Dict mapping shortened genetic ancestry group names to full names. + Example: {'afr':'African/African American'}. Default is None. + :param by_sex: Boolean to include breakdown of cohorts by inferred sex (XX and XY) as well. + Default is None. + :freq_index_dict: Dict mapping groups to their index for freq info in ht.freq_index_dict[0]. + Default is None. + :return: Tuple containing a dictionary containing GKS VA frequency information, + (split by ancestry groups and sex if desired) for the specified variant. + """ + # Throw warnings if contradictory arguments passed. + if by_sex and not ancestry_groups: + logger.warning( + "Splitting whole database by sex is not yet supported. If using 'by_sex'," + " please also specify 'ancestry_groups' to stratify by." + ) + + contig = input_struct.locus.contig + pos = input_struct.locus.position + ref = input_struct.alleles[0] + var = input_struct.alleles[1] + gnomad_id = f"{contig}-{pos}-{ref}-{var}" + + # Define function to return a frequency report dictionary for a given group + def _create_group_dicts( + group_index: int, + group_id: str, + group_label: str, + group_sex: str = None, + ) -> dict: + """ + Generate a dictionary containing the frequency information of a given variant for a given group. + + :param group_index: Index of frequency within the 'freq' annotation for the desired group. + :param group_id: String containing variant, genetic ancestry group, and sex (if requested). + - Example: "chr19-41094895-C-T.afr.XX". + :param group_label: String containing the full name of genetic ancestry group requested. + - Example: "African/African American". + :param group_sex: String indicating the sex of the group. + - Example: "XX" or "XY". + :return: Dictionary containing variant frequency information, + - (by genetic ancestry group and sex if desired) for specified variant. + """ + # Obtain frequency information for the specified variant. + group_freq = input_struct.freq[group_index] + + # Cohort characteristics. + characteristics = [] + characteristics.append({"name": "genetic ancestry", "value": group_label}) + if group_sex is not None: + characteristics.append({"name": "biological sex", "value": group_sex}) + + # Dictionary to be returned containing information for a specified group. + freq_record = { + "id": f"{gnomad_id},{group_id.upper()}", + "type": "CohortAlleleFrequency", + "label": f"{group_label} Cohort Allele Frequency for {gnomad_id}", + "focusAllele": "#/focusAllele", + "focusAlleleCount": group_freq["AC"], + "locusAlleleCount": group_freq["AN"], + "alleleFrequency": group_freq["AF"], + "cohort": {"id": group_id.upper(), "characteristics": characteristics}, + "ancillaryResults": {"homozygotes": group_freq["homozygote_count"]}, + } + + return freq_record + + # Create a list to then add the dictionaries for frequency reports for + # different ancestry groups to. + list_of_group_info_dicts = [] + + # Iterate through provided groups and generate dictionaries. + if ancestry_groups: + for group in ancestry_groups: + key = f"{group}-adj" + index_value = freq_index_dict.get(key) + group_result = _create_group_dicts( + group_index=index_value, + group_id=group, + group_label=ancestry_groups_dict[group], + ) + + # If specified, stratify group information by sex. + if by_sex: + sex_list = [] + for sex in ["XX", "XY"]: + sex_key = f"{group}-{sex}-adj" + sex_index_value = freq_index_dict.get(sex_key) + sex_id = f"{group}.{sex}" + sex_result = _create_group_dicts( + group_index=sex_index_value, + group_id=sex_id, + group_label=ancestry_groups_dict[group], + group_sex=sex, + ) + sex_list.append(sex_result) + + group_result["subcohortFrequency"] = sex_list + + list_of_group_info_dicts.append(group_result) + + # Add overall frequency, via label 'adj' which is currently stored at + # position #1 (index 0). + overall_freq = input_struct.freq[0] + + # Create final dictionary to be returned. + final_freq_dict = { + "id": f"{label_name}-{label_version}-{gnomad_id}", + "type": "CohortAlleleFrequency", + "label": f"Overall Cohort Allele Frequency for {gnomad_id}", + "derivedFrom": { + "id": f"{label_name}{label_version}", + "type": "DataSet", + "label": f"{label_name} v{label_version}", + "version": f"{label_version}", + }, + "focusAllele": ( + "" + ), # Information can be populated with the result of add_gks_vrs() + "focusAlleleCount": overall_freq["AC"], + "locusAlleleCount": overall_freq["AN"], + "alleleFrequency": overall_freq["AF"], + "cohort": {"id": "ALL"}, + } + + # Create ancillaryResults for additional frequency and popMaxFAF95 information + ancillaryResults = { + "homozygotes": overall_freq["homozygote_count"], + "popMaxFAF95": { + "frequency": input_struct.faf95.popmax, + "confidenceInterval": 0.95, + }, + } + + if input_struct.faf95.popmax_population is not None: + ancillaryResults["popMaxFAF95"]["popFreqID"] = ( + f"{gnomad_id}.{input_struct.faf95.popmax_population.upper()}", + ) + else: + ancillaryResults["popMaxFAF95"]["popFreqID"] = None + + # Add mean coverage depth statistics if the input was annotated + # with coverage information. + if "mean_depth" in input_struct: + ancillaryResults["meanDepth"] = input_struct.mean_depth + + final_freq_dict["ancillaryResults"] = ancillaryResults + + # If ancestry_groups were passed, add the ancestry group dictionary to the + # final frequency dictionary to be returned. + if ancestry_groups: + final_freq_dict["subcohortFrequency"] = list_of_group_info_dicts + + return final_freq_dict diff --git a/requirements.txt b/requirements.txt index ce9bd678d..ddf7d09de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ annoy +ga4gh.vrs[extras]==0.8.4 hail hdbscan ipywidgets