diff --git a/gnomad/resources/resource_utils.py b/gnomad/resources/resource_utils.py index 9b3a311e4..57b397325 100644 --- a/gnomad/resources/resource_utils.py +++ b/gnomad/resources/resource_utils.py @@ -304,6 +304,45 @@ def import_resource(self, overwrite: bool = True, **kwargs) -> None: ) +class ExpressionResource(BaseResource): + """ + A Hail Expression resource. + + :param path: The Expression path (typically ending in .he). + :param import_args: Any sources that are required for the import and need to be + kept track of and/or passed to the import_func (e.g. .vcf path for an imported + VCF). + :param import_func: A function used to import the Expression. `import_func` will be + passed the `import_args` dictionary as kwargs. + """ + + expected_file_extensions: List[str] = [".he"] + + def he(self, force_import: bool = False) -> hl.expr.Expression: + """ + Read and return the Hail Expression resource. + + :return: Hail Expression resource. + """ + if self.path is None or force_import: + return self.import_func(**self.import_args) + else: + return hl.experimental.read_expression(self.path) + + def import_resource(self, overwrite: bool = True, **kwargs) -> None: + """ + Import the Expression resource using its import_func and writes it in its path. + + :param overwrite: If set, existing file(s) will be overwritten. + :param kwargs: Any other parameters to be passed to hl.experimental. + write_expression. + :return: Nothing. + """ + self.import_func(**self.import_args).write( + self.path, overwrite=overwrite, **kwargs + ) + + class BaseVersionedResource: """ Class for a versioned resource. diff --git a/gnomad/utils/constraint.py b/gnomad/utils/constraint.py index f1bd38914..0255ba191 100644 --- a/gnomad/utils/constraint.py +++ b/gnomad/utils/constraint.py @@ -5,6 +5,8 @@ import hail as hl +from gnomad.utils.vep import explode_by_vep_annotation, process_consequences + logging.basicConfig( format="%(asctime)s (%(name)s %(lineno)s): %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", @@ -311,7 +313,9 @@ def annotate_mutation_type( :return: Table with mutation type annotations added. """ # Determine the context length by collecting all the context lengths. - context_lengths = list(filter(None, set(hl.len(t.context).collect()))) + context_lengths = list( + filter(None, t.aggregate(hl.agg.collect_as_set(hl.len(t.context)))) + ) if len(context_lengths) > 1: raise ValueError( "More than one length was found among the first 100 'context' values." @@ -700,3 +704,176 @@ def get_all_pop_lengths( ) return pop_lengths + + +def get_constraint_grouping_expr( + vep_annotation_expr: hl.StructExpression, + coverage_expr: Optional[hl.Int32Expression] = None, + include_transcript_group: bool = True, + include_canonical_group: bool = True, +) -> Dict[str, Union[hl.StringExpression, hl.Int32Expression, hl.BooleanExpression]]: + """ + Collect annotations used for constraint groupings. + + Function collects the following annotations: + - annotation - 'most_severe_consequence' annotation in `vep_annotation_expr` + - modifier - classic lof annotation from 'lof' annotation in + `vep_annotation_expr`, LOFTEE annotation from 'lof' annotation in + `vep_annotation_expr`, PolyPhen annotation from 'polyphen_prediction' in + `vep_annotation_expr`, or "None" if neither is defined + - gene - 'gene_symbol' annotation inside `vep_annotation_expr` + - coverage - exome coverage if `coverage_expr` is specified + - transcript - id from 'transcript_id' in `vep_annotation_expr` (added when + `include_transcript_group` is True) + - canonical from `vep_annotation_expr` (added when `include_canonical_group` is + True) + + .. note:: + This function expects that the following fields are present in + `vep_annotation_expr`: + - lof + - polyphen_prediction + - most_severe_consequence + - gene_symbol + - transcript_id (if `include_transcript_group` is True) + - canonical (if `include_canonical_group` is True) + + :param vep_annotation_expr: StructExpression of VEP annotation. + :param coverage_expr: Optional Int32Expression of exome coverage. Default is None. + :param include_transcript_group: Whether to include the transcript annotation in the + groupings. Default is True. + :param include_canonical_group: Whether to include canonical annotation in the + groupings. Default is True. + :return: A dictionary with keys as annotation names and values as actual + annotations. + """ + lof_expr = vep_annotation_expr.lof + polyphen_prediction_expr = vep_annotation_expr.polyphen_prediction + + # Create constraint annotations to be used for groupings. + groupings = { + "annotation": vep_annotation_expr.most_severe_consequence, + "modifier": hl.coalesce(lof_expr, polyphen_prediction_expr, "None"), + "gene": vep_annotation_expr.gene_symbol, + } + if coverage_expr is not None: + groupings["coverage"] = coverage_expr + + # Add 'transcript' and 'canonical' annotation if requested. + if include_transcript_group: + groupings["transcript"] = vep_annotation_expr.transcript_id + if include_canonical_group: + groupings["canonical"] = hl.or_else(vep_annotation_expr.canonical == 1, False) + + return groupings + + +def annotate_exploded_vep_for_constraint_groupings( + ht: hl.Table, vep_annotation: str = "transcript_consequences" +) -> Tuple[Union[hl.Table, hl.MatrixTable], Tuple[str]]: + """ + Annotate Table with annotations used for constraint groupings. + + Function explodes the specified VEP annotation (`vep_annotation`) and adds the following annotations: + - annotation -'most_severe_consequence' annotation in `vep_annotation` + - modifier - classic lof annotation from 'lof' annotation in + `vep_annotation`, LOFTEE annotation from 'lof' annotation in + `vep_annotation`, PolyPhen annotation from 'polyphen_prediction' in + `vep_annotation`, or "None" if neither is defined + - gene - 'gene_symbol' annotation inside `vep_annotation` + - coverage - exome coverage in `ht` + - transcript - id from 'transcript_id' in `vep_annotation` (added when + `include_transcript_group` is True) + - canonical from `vep_annotation` (added when `include_canonical_group` is + True) + + .. note:: + This function expects that the following annotations are present in `ht`: + - vep + - exome_coverage + + :param t: Input Table or MatrixTable. + :param vep_annotation: Name of annotation in 'vep' annotation (one of + "transcript_consequences" and "worst_csq_by_gene") that will be used for + obtaining constraint annotations. Default is "transcript_consequences". + :return: A tuple of input Table or MatrixTable with grouping annotations added and + the names of added annotations. + """ + if vep_annotation == "transcript_consequences": + include_transcript_group = include_canonical_group = True + else: + include_transcript_group = include_canonical_group = False + + # Annotate 'worst_csq_by_gene' to `ht` if it's specified for `vep_annotation`. + if vep_annotation == "worst_csq_by_gene": + ht = process_consequences(ht) + + # Explode the specified VEP annotation. + ht = explode_by_vep_annotation(ht, vep_annotation) + + # Collect the annotations used for groupings. + groupings = get_constraint_grouping_expr( + ht[vep_annotation], + coverage_expr=ht.exome_coverage, + include_transcript_group=include_transcript_group, + include_canonical_group=include_canonical_group, + ) + + return ht.annotate(**groupings), tuple(groupings.keys()) + + +def compute_expected_variants( + ht: hl.Table, + plateau_models_expr: hl.StructExpression, + mu_expr: hl.Float64Expression, + cov_corr_expr: hl.Float64Expression, + cpg_expr: hl.BooleanExpression, + pop: Optional[str] = None, +) -> Dict[str, Union[hl.Float64Expression, hl.Int64Expression]]: + """ + Apply plateau models for all sites and for a population (if specified) to compute predicted proportion observed ratio and expected variant counts. + + :param ht: Input Table. + :param plateau_models_expr: Linear models (output of `build_models()`, with the values + of the dictionary formatted as a StructExpression of intercept and slope, that + calibrates mutation rate to proportion observed for high coverage exomes. It + includes models for CpG, non-CpG sites, and each population in `POPS`. + :param mu_expr: Float64Expression of mutation rate. + :param cov_corr_expr: Float64Expression of corrected coverage expression. + :param cpg_expr: BooleanExpression noting whether a site is a CPG site. + :param pop: Optional population to use when applying plateau model. Default is + None. + :return: A dictionary with predicted proportion observed ratio and expected variant + counts. + """ + if pop is None: + pop = "" + plateau_model = hl.literal(plateau_models_expr.total)[cpg_expr] + slope = plateau_model[1] + intercept = plateau_model[0] + agg_func = hl.agg.sum + ann_to_sum = ["observed_variants", "possible_variants"] + else: + plateau_model = hl.literal(plateau_models_expr[pop]) + slope = hl.map(lambda f: f[cpg_expr][1], plateau_model) + intercept = hl.map(lambda f: f[cpg_expr][0], plateau_model) + agg_func = hl.agg.array_sum + pop = f"_{pop}" + ann_to_sum = [f"downsampling_counts{pop}"] + + # Apply plateau models for specified population. + ppo_expr = mu_expr * slope + intercept + + # Generate sum aggregators for 'predicted_proportion_observed' and + # 'expected_variants', for specified population. + agg_expr = { + f"predicted_proportion_observed{pop}": agg_func(ppo_expr), + f"expected_variants{pop}": agg_func(ppo_expr * cov_corr_expr), + } + + # Generate sum aggregators for 'observed_variants' and 'possible_variants' on + # the entire dataset if pop is None, and for 'downsampling_counts' for + # specified population if pop is not None. + agg_expr.update({ann: agg_func(ht[ann]) for ann in ann_to_sum}) + + return agg_expr diff --git a/gnomad/utils/vep.py b/gnomad/utils/vep.py index ecdcb5044..c0135003d 100644 --- a/gnomad/utils/vep.py +++ b/gnomad/utils/vep.py @@ -640,3 +640,32 @@ def add_most_severe_csq_to_tc_within_vep_root( if isinstance(t, hl.MatrixTable) else t.annotate(**{vep_root: annotation}) ) + + +def explode_by_vep_annotation( + t: Union[hl.Table, hl.MatrixTable], + vep_annotation: str = "transcript_consequences", + vep_root: str = "vep", +) -> Union[hl.Table, hl.MatrixTable]: + """ + Explode the specified VEP annotation on the input Table/MatrixTable. + + :param t: Input Table or MatrixTable. + :param vep_annotation: Name of annotation in `vep_root` to explode. + :param vep_root: Name used for root VEP annotation. Default is 'vep'. + :return: Table or MatrixTable with exploded VEP annotation. + """ + if vep_annotation not in t[vep_root].keys(): + raise ValueError( + f"{vep_annotation} is not a row field of the {vep_root} annotation in" + " Table/MatrixTable!" + ) + # Create top-level annotation for `vep_annotation` and explode it. + if isinstance(t, hl.Table): + t = t.transmute(**{vep_annotation: t[vep_root][vep_annotation]}) + t = t.explode(t[vep_annotation]) + else: + t = t.transmute_rows(**{vep_annotation: t[vep_root][vep_annotation]}) + t = t.explode_rows(t[vep_annotation]) + + return t