Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions gnomad/resources/resource_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,45 @@ def import_resource(self, overwrite: bool = True, **kwargs) -> None:
)


class ExpressionResource(BaseResource):
"""
A Hail Expression resource.

:param path: The Expression path (typically ending in .he).
:param import_args: Any sources that are required for the import and need to be
kept track of and/or passed to the import_func (e.g. .vcf path for an imported
VCF).
:param import_func: A function used to import the Expression. `import_func` will be
passed the `import_args` dictionary as kwargs.
"""

expected_file_extensions: List[str] = [".he"]

def he(self, force_import: bool = False) -> hl.expr.Expression:
"""
Read and return the Hail Expression resource.

:return: Hail Expression resource.
"""
if self.path is None or force_import:
return self.import_func(**self.import_args)
else:
return hl.experimental.read_expression(self.path)

def import_resource(self, overwrite: bool = True, **kwargs) -> None:
"""
Import the Expression resource using its import_func and writes it in its path.

:param overwrite: If set, existing file(s) will be overwritten.
:param kwargs: Any other parameters to be passed to hl.experimental.
write_expression.
:return: Nothing.
"""
self.import_func(**self.import_args).write(
self.path, overwrite=overwrite, **kwargs
)


class BaseVersionedResource:
"""
Class for a versioned resource.
Expand Down
179 changes: 178 additions & 1 deletion gnomad/utils/constraint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import hail as hl

from gnomad.utils.vep import explode_by_vep_annotation, process_consequences

logging.basicConfig(
format="%(asctime)s (%(name)s %(lineno)s): %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
Expand Down Expand Up @@ -311,7 +313,9 @@ def annotate_mutation_type(
:return: Table with mutation type annotations added.
"""
# Determine the context length by collecting all the context lengths.
context_lengths = list(filter(None, set(hl.len(t.context).collect())))
context_lengths = list(
filter(None, t.aggregate(hl.agg.collect_as_set(hl.len(t.context))))
)
if len(context_lengths) > 1:
raise ValueError(
"More than one length was found among the first 100 'context' values."
Expand Down Expand Up @@ -700,3 +704,176 @@ def get_all_pop_lengths(
)

return pop_lengths


def get_constraint_grouping_expr(
vep_annotation_expr: hl.StructExpression,
coverage_expr: Optional[hl.Int32Expression] = None,
include_transcript_group: bool = True,
include_canonical_group: bool = True,
) -> Dict[str, Union[hl.StringExpression, hl.Int32Expression, hl.BooleanExpression]]:
"""
Collect annotations used for constraint groupings.

Function collects the following annotations:
- annotation - 'most_severe_consequence' annotation in `vep_annotation_expr`
- modifier - classic lof annotation from 'lof' annotation in
`vep_annotation_expr`, LOFTEE annotation from 'lof' annotation in
`vep_annotation_expr`, PolyPhen annotation from 'polyphen_prediction' in
`vep_annotation_expr`, or "None" if neither is defined
- gene - 'gene_symbol' annotation inside `vep_annotation_expr`
- coverage - exome coverage if `coverage_expr` is specified
- transcript - id from 'transcript_id' in `vep_annotation_expr` (added when
`include_transcript_group` is True)
- canonical from `vep_annotation_expr` (added when `include_canonical_group` is
True)

.. note::
This function expects that the following fields are present in
`vep_annotation_expr`:
- lof
- polyphen_prediction
- most_severe_consequence
- gene_symbol
- transcript_id (if `include_transcript_group` is True)
- canonical (if `include_canonical_group` is True)

:param vep_annotation_expr: StructExpression of VEP annotation.
:param coverage_expr: Optional Int32Expression of exome coverage. Default is None.
:param include_transcript_group: Whether to include the transcript annotation in the
groupings. Default is True.
:param include_canonical_group: Whether to include canonical annotation in the
groupings. Default is True.
:return: A dictionary with keys as annotation names and values as actual
annotations.
"""
lof_expr = vep_annotation_expr.lof
polyphen_prediction_expr = vep_annotation_expr.polyphen_prediction

# Create constraint annotations to be used for groupings.
groupings = {
"annotation": vep_annotation_expr.most_severe_consequence,
"modifier": hl.coalesce(lof_expr, polyphen_prediction_expr, "None"),
"gene": vep_annotation_expr.gene_symbol,
}
if coverage_expr is not None:
groupings["coverage"] = coverage_expr

# Add 'transcript' and 'canonical' annotation if requested.
if include_transcript_group:
groupings["transcript"] = vep_annotation_expr.transcript_id
if include_canonical_group:
groupings["canonical"] = hl.or_else(vep_annotation_expr.canonical == 1, False)

return groupings


def annotate_exploded_vep_for_constraint_groupings(
ht: hl.Table, vep_annotation: str = "transcript_consequences"
) -> Tuple[Union[hl.Table, hl.MatrixTable], Tuple[str]]:
"""
Annotate Table with annotations used for constraint groupings.

Function explodes the specified VEP annotation (`vep_annotation`) and adds the following annotations:
- annotation -'most_severe_consequence' annotation in `vep_annotation`
- modifier - classic lof annotation from 'lof' annotation in
`vep_annotation`, LOFTEE annotation from 'lof' annotation in
`vep_annotation`, PolyPhen annotation from 'polyphen_prediction' in
`vep_annotation`, or "None" if neither is defined
- gene - 'gene_symbol' annotation inside `vep_annotation`
- coverage - exome coverage in `ht`
- transcript - id from 'transcript_id' in `vep_annotation` (added when
`include_transcript_group` is True)
- canonical from `vep_annotation` (added when `include_canonical_group` is
True)

.. note::
This function expects that the following annotations are present in `ht`:
- vep
- exome_coverage

:param t: Input Table or MatrixTable.
:param vep_annotation: Name of annotation in 'vep' annotation (one of
"transcript_consequences" and "worst_csq_by_gene") that will be used for
obtaining constraint annotations. Default is "transcript_consequences".
:return: A tuple of input Table or MatrixTable with grouping annotations added and
the names of added annotations.
"""
if vep_annotation == "transcript_consequences":
include_transcript_group = include_canonical_group = True
else:
include_transcript_group = include_canonical_group = False

# Annotate 'worst_csq_by_gene' to `ht` if it's specified for `vep_annotation`.
if vep_annotation == "worst_csq_by_gene":
ht = process_consequences(ht)

# Explode the specified VEP annotation.
ht = explode_by_vep_annotation(ht, vep_annotation)

# Collect the annotations used for groupings.
groupings = get_constraint_grouping_expr(
ht[vep_annotation],
coverage_expr=ht.exome_coverage,
include_transcript_group=include_transcript_group,
include_canonical_group=include_canonical_group,
)

return ht.annotate(**groupings), tuple(groupings.keys())


def compute_expected_variants(
ht: hl.Table,
plateau_models_expr: hl.StructExpression,
mu_expr: hl.Float64Expression,
cov_corr_expr: hl.Float64Expression,
cpg_expr: hl.BooleanExpression,
pop: Optional[str] = None,
) -> Dict[str, Union[hl.Float64Expression, hl.Int64Expression]]:
"""
Apply plateau models for all sites and for a population (if specified) to compute predicted proportion observed ratio and expected variant counts.

:param ht: Input Table.
:param plateau_models_expr: Linear models (output of `build_models()`, with the values
of the dictionary formatted as a StructExpression of intercept and slope, that
calibrates mutation rate to proportion observed for high coverage exomes. It
includes models for CpG, non-CpG sites, and each population in `POPS`.
:param mu_expr: Float64Expression of mutation rate.
:param cov_corr_expr: Float64Expression of corrected coverage expression.
:param cpg_expr: BooleanExpression noting whether a site is a CPG site.
:param pop: Optional population to use when applying plateau model. Default is
None.
:return: A dictionary with predicted proportion observed ratio and expected variant
counts.
"""
if pop is None:
pop = ""
plateau_model = hl.literal(plateau_models_expr.total)[cpg_expr]
slope = plateau_model[1]
intercept = plateau_model[0]
agg_func = hl.agg.sum
ann_to_sum = ["observed_variants", "possible_variants"]
else:
plateau_model = hl.literal(plateau_models_expr[pop])
slope = hl.map(lambda f: f[cpg_expr][1], plateau_model)
intercept = hl.map(lambda f: f[cpg_expr][0], plateau_model)
agg_func = hl.agg.array_sum
pop = f"_{pop}"
ann_to_sum = [f"downsampling_counts{pop}"]

# Apply plateau models for specified population.
ppo_expr = mu_expr * slope + intercept

# Generate sum aggregators for 'predicted_proportion_observed' and
# 'expected_variants', for specified population.
agg_expr = {
f"predicted_proportion_observed{pop}": agg_func(ppo_expr),
f"expected_variants{pop}": agg_func(ppo_expr * cov_corr_expr),
}

# Generate sum aggregators for 'observed_variants' and 'possible_variants' on
# the entire dataset if pop is None, and for 'downsampling_counts' for
# specified population if pop is not None.
agg_expr.update({ann: agg_func(ht[ann]) for ann in ann_to_sum})

return agg_expr
29 changes: 29 additions & 0 deletions gnomad/utils/vep.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,32 @@ def add_most_severe_csq_to_tc_within_vep_root(
if isinstance(t, hl.MatrixTable)
else t.annotate(**{vep_root: annotation})
)


def explode_by_vep_annotation(
t: Union[hl.Table, hl.MatrixTable],
vep_annotation: str = "transcript_consequences",
vep_root: str = "vep",
) -> Union[hl.Table, hl.MatrixTable]:
"""
Explode the specified VEP annotation on the input Table/MatrixTable.

:param t: Input Table or MatrixTable.
:param vep_annotation: Name of annotation in `vep_root` to explode.
:param vep_root: Name used for root VEP annotation. Default is 'vep'.
:return: Table or MatrixTable with exploded VEP annotation.
"""
if vep_annotation not in t[vep_root].keys():
raise ValueError(
f"{vep_annotation} is not a row field of the {vep_root} annotation in"
" Table/MatrixTable!"
)
# Create top-level annotation for `vep_annotation` and explode it.
if isinstance(t, hl.Table):
t = t.transmute(**{vep_annotation: t[vep_root][vep_annotation]})
t = t.explode(t[vep_annotation])
else:
t = t.transmute_rows(**{vep_annotation: t[vep_root][vep_annotation]})
t = t.explode_rows(t[vep_annotation])

return t