From 5b62076b986a1c0261d3129d19792b5028104d9b Mon Sep 17 00:00:00 2001 From: jkgoodrich <33063077+jkgoodrich@users.noreply.github.com> Date: Fri, 30 Jun 2023 15:40:09 -0600 Subject: [PATCH 1/3] Updates to `assign_population_pcs` to use onnx models --- gnomad/sample_qc/ancestry.py | 127 +++++++++++++++++++++++++++++++---- requirements.txt | 3 + 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/gnomad/sample_qc/ancestry.py b/gnomad/sample_qc/ancestry.py index 90078049f..0196c405c 100644 --- a/gnomad/sample_qc/ancestry.py +++ b/gnomad/sample_qc/ancestry.py @@ -2,10 +2,16 @@ import logging import random -from typing import Any, Counter, List, Optional, Tuple, Union +from collections import Counter +from typing import Any, Callable, List, Optional, Tuple, Union import hail as hl +import numpy as np +import onnx +import onnxruntime as rt import pandas as pd +from skl2onnx import convert_sklearn +from skl2onnx.common.data_types import FloatTensorType from gnomad.utils.filtering import filter_to_autosomes @@ -117,6 +123,85 @@ def pc_project( return mt.cols().select("scores") +def apply_onnx_classification_model( + data_pd: pd.DataFrame, fit: onnx.ModelProto +) -> Tuple[np.ndarray, pd.DataFrame]: + """ + Apply an ONNX classification model `fit` to a pandas dataframe `data_pd`. + + :param data_pd: Pandas dataframe containing the data to be classified. + :param fit: ONNX model to be applied. + :return: Tuple of classification and probabilities. + """ + if not isinstance(fit, onnx.ModelProto): + raise TypeError(f"The model supplied is not an onnx model!") + + sess = rt.InferenceSession( + fit.SerializeToString(), providers=["CPUExecutionProvider"] + ) + input_name = sess.get_inputs()[0].name + label_name = sess.get_outputs()[0].name + prob_name = sess.get_outputs()[1].name + classification = sess.run([label_name], {input_name: data_pd.astype(np.float32)})[0] + probs = sess.run([prob_name], {input_name: data_pd.astype(np.float32)})[0] + probs = pd.DataFrame.from_dict(probs) + + return classification, probs + + +def apply_sklearn_classification_model( + data_pd: pd.DataFrame, fit: Any +) -> Tuple[np.ndarray, pd.DataFrame]: + """ + Apply an sklearn classification model `fit` to a pandas dataframe `data_pd`. + + :param data_pd: Pandas dataframe containing the data to be classified. + :param fit: Sklearn model to be applied. + :return: Tuple of classification and probabilities. + """ + from sklearn.utils.validation import check_is_fitted + + try: + check_is_fitted(fit) + except TypeError: + raise TypeError("The supplied model is not an sklearn model!") + + classification = fit.predict(data_pd) + probs = fit.predict_proba(data_pd) + probs = pd.DataFrame(probs, columns=[f"prob_{p}" for p in fit.classes_]) + + return classification, probs + + +def convert_sklearn_rf_to_onnx( + fit: Any, target_opset: Optional[int] = None +) -> onnx.ModelProto: + """ + Convert a sklearn random forest model to ONNX. + + :param fit: Sklearn random forest model to be converted. + :param target_opset: An optional target ONNX opset version to convert the model to. + :return: ONNX model. + """ + if not isinstance(fit, onnx.ModelProto): + raise TypeError(f"The model supplied is not an onnx model!") + + initial_type = [("float_input", FloatTensorType([None, fit.n_features_]))] + onx = convert_sklearn(fit, initial_types=initial_type, target_opset=target_opset) + + domains = onx.opset_import + opset_version = "" + for dom in domains: + opset_version += f"domain: {dom.domain}, version: {dom.version}\n" + + logger.info( + "sklearn model converted to onnx model with the following opset version: \n%s", + opset_version, + ) + + return onx + + def assign_population_pcs( pop_pca_scores: Union[hl.Table, pd.DataFrame], pc_cols: Union[hl.expr.ArrayExpression, List[int], List[str]], @@ -129,6 +214,10 @@ def assign_population_pcs( output_col: str = "pop", missing_label: str = "oth", pc_expr: Union[hl.expr.ArrayExpression, str] = "scores", + convert_model_func: Optional[Callable[[Any], Any]] = None, + apply_model_func: Callable[ + [pd.DataFrame, Any], Any + ] = apply_sklearn_classification_model, ) -> Tuple[ Union[hl.Table, pd.DataFrame], Any ]: # 2nd element of the tuple should be RandomForestClassifier but we do not want to import sklearn.RandomForestClassifier outside @@ -175,6 +264,12 @@ def assign_population_pcs( smaller than `min_prob`. :param pc_expr: Column storing the list of PCs. Only used if `pc_cols` is a List of integers. Default is scores. + :param convert_model_func: Optional function to convert the model to ONNX format. + Default is no conversion. + :param apply_model_func: Function to apply the model to the data. Default is + `apply_sklearn_classification_model`, which will apply a sklearn classification + model to the data. This default will work if no `fit` is set, or the supplied + `fit` is a sklearn classification model. :return: Hail Table or Pandas Dataframe (depending on input) containing sample IDs and imputed population labels, trained random forest model. """ @@ -224,7 +319,7 @@ def assign_population_pcs( ) pop_pc_pd = pop_pca_scores - # Split training data into subsamples for fitting and evaluating + # Split training data into subsamples for fitting and evaluating. if not fit: train_data = pop_pc_pd.loc[~pop_pc_pd[known_col].isnull()] N = len(train_data) @@ -234,33 +329,34 @@ def assign_population_pcs( fit_samples = [x for x in train_fit["s"]] evaluate_fit = train_data.loc[~train_data["s"].isin(fit_samples)] - # Train RF + # Train RF. training_set_known_labels = train_fit[known_col].values training_set_pcs = train_fit[pc_cols].values evaluation_set_pcs = evaluate_fit[pc_cols].values pop_clf = RandomForestClassifier(n_estimators=n_estimators, random_state=seed) pop_clf.fit(training_set_pcs, training_set_known_labels) - print( - "Random forest feature importances are as follows: {}".format( - pop_clf.feature_importances_ - ) + logger.info( + "Random forest feature importances are as follows: %s", + pop_clf.feature_importances_, ) - # Evaluate RF + # Evaluate RF. predictions = pop_clf.predict(evaluation_set_pcs) error_rate = 1 - sum(evaluate_fit[known_col] == predictions) / float( len(predictions) ) - print("Estimated error rate for RF model is {}".format(error_rate)) + logger.info("Estimated error rate for RF model is %.4f", error_rate) else: pop_clf = fit - # Classify data - pop_pc_pd[output_col] = pop_clf.predict(pop_pc_pd[pc_cols].values) - probs = pop_clf.predict_proba(pop_pc_pd[pc_cols].values) - probs = pd.DataFrame(probs, columns=[f"prob_{p}" for p in pop_clf.classes_]) - pop_pc_pd = pd.concat([pop_pc_pd, probs], axis=1) + # Classify data. + classifications, probs = apply_model_func(pop_pc_pd[pc_cols].values, pop_clf) + + pop_pc_pd[output_col] = classifications + pop_pc_pd = pd.concat( + [pop_pc_pd.reset_index(drop=True), probs.reset_index(drop=True)], axis=1 + ) probs["max"] = probs.max(axis=1) pop_pc_pd.loc[probs["max"] < min_prob, output_col] = missing_label pop_pc_pd = pop_pc_pd.drop(pc_cols, axis="columns") @@ -272,6 +368,9 @@ def assign_population_pcs( ), ) + if convert_model_func is not None: + pop_clf = convert_model_func(pop_clf) + if hail_input: pops_ht = hl.Table.from_pandas(pop_pc_pd, key=list(pop_pca_scores.key)) pops_ht = pops_ht.annotate_globals( diff --git a/requirements.txt b/requirements.txt index 60d326875..ce9bd678d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,8 @@ hail hdbscan ipywidgets networkx +onnx +onnxruntime scikit-learn +skl2onnx slackclient==2.5.0 From c775ffde4bb2a0848963ca2e9a3fa685820f3cf6 Mon Sep 17 00:00:00 2001 From: jkgoodrich <33063077+jkgoodrich@users.noreply.github.com> Date: Fri, 30 Jun 2023 22:06:47 -0600 Subject: [PATCH 2/3] Fix model check in `convert_sklearn_rf_to_onnx` --- gnomad/sample_qc/ancestry.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/gnomad/sample_qc/ancestry.py b/gnomad/sample_qc/ancestry.py index 0196c405c..96d067c33 100644 --- a/gnomad/sample_qc/ancestry.py +++ b/gnomad/sample_qc/ancestry.py @@ -145,6 +145,7 @@ def apply_onnx_classification_model( classification = sess.run([label_name], {input_name: data_pd.astype(np.float32)})[0] probs = sess.run([prob_name], {input_name: data_pd.astype(np.float32)})[0] probs = pd.DataFrame.from_dict(probs) + probs = probs.add_prefix("prob_") return classification, probs @@ -183,10 +184,14 @@ def convert_sklearn_rf_to_onnx( :param target_opset: An optional target ONNX opset version to convert the model to. :return: ONNX model. """ - if not isinstance(fit, onnx.ModelProto): - raise TypeError(f"The model supplied is not an onnx model!") + from sklearn.utils.validation import check_is_fitted + + try: + check_is_fitted(fit) + except TypeError: + raise TypeError("The supplied model is not an sklearn model!") - initial_type = [("float_input", FloatTensorType([None, fit.n_features_]))] + initial_type = [("float_input", FloatTensorType([None, fit.n_features_in_]))] onx = convert_sklearn(fit, initial_types=initial_type, target_opset=target_opset) domains = onx.opset_import From 8b9f87e431aead2629616b6c31462d4801252b0b Mon Sep 17 00:00:00 2001 From: jkgoodrich <33063077+jkgoodrich@users.noreply.github.com> Date: Fri, 30 Jun 2023 22:36:33 -0600 Subject: [PATCH 3/3] remove unneeded fstring --- gnomad/sample_qc/ancestry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gnomad/sample_qc/ancestry.py b/gnomad/sample_qc/ancestry.py index 96d067c33..c426cd34e 100644 --- a/gnomad/sample_qc/ancestry.py +++ b/gnomad/sample_qc/ancestry.py @@ -134,7 +134,7 @@ def apply_onnx_classification_model( :return: Tuple of classification and probabilities. """ if not isinstance(fit, onnx.ModelProto): - raise TypeError(f"The model supplied is not an onnx model!") + raise TypeError("The model supplied is not an onnx model!") sess = rt.InferenceSession( fit.SerializeToString(), providers=["CPUExecutionProvider"]