diff --git a/docs/api/summarize.rst b/docs/api/summarize.rst index 9aa2fa20c..6e067966f 100644 --- a/docs/api/summarize.rst +++ b/docs/api/summarize.rst @@ -8,3 +8,14 @@ Modules ------- .. autofunction:: summarize +.. autofunction:: extract_keywords + +Keyword Extraction Engines +-------------------------- + +KeyBERT ++++++++ + +.. automodule:: pythainlp.summarize.keybert +.. autoclass:: pythainlp.summarize.keybert.KeyBERT + :members: diff --git a/pythainlp/summarize/__init__.py b/pythainlp/summarize/__init__.py index a56d74fd2..359b11024 100644 --- a/pythainlp/summarize/__init__.py +++ b/pythainlp/summarize/__init__.py @@ -3,11 +3,10 @@ Text summarization """ -__all__ = [ - "summarize", -] +__all__ = ["summarize", "extract_keywords"] DEFAULT_SUMMARIZE_ENGINE = "frequency" CPE_KMUTT_THAI_SENTENCE_SUM = "mt5-cpe-kmutt-thai-sentence-sum" +DEFAULT_KEYWORD_EXTRACTION_ENGINE = "keybert" -from pythainlp.summarize.core import summarize +from pythainlp.summarize.core import summarize, extract_keywords diff --git a/pythainlp/summarize/core.py b/pythainlp/summarize/core.py index 94c13e231..e2bffb7cc 100644 --- a/pythainlp/summarize/core.py +++ b/pythainlp/summarize/core.py @@ -1,14 +1,14 @@ # -*- coding: utf-8 -*- """ -Text summarization +Text summarization and Keyword extraction """ - -from typing import List +from typing import List, Iterable, Optional, Tuple from pythainlp.summarize import ( DEFAULT_SUMMARIZE_ENGINE, CPE_KMUTT_THAI_SENTENCE_SUM, + DEFAULT_KEYWORD_EXTRACTION_ENGINE, ) from pythainlp.summarize.freq import FrequencySummarizer from pythainlp.tokenize import sent_tokenize @@ -112,3 +112,138 @@ def summarize( sents = sent_tokenize(text, engine="whitespace+newline")[:n] return sents + + +def extract_keywords( + text: str, + keyphrase_ngram_range: Tuple[int, int] = (1, 2), + max_keywords: int = 5, + min_df: int = 1, + engine: str = DEFAULT_KEYWORD_EXTRACTION_ENGINE, + tokenizer: str = "newmm", + stop_words: Optional[Iterable[str]] = None, +) -> List[str]: + """ + This function returns most-relevant keywords (and/or keyphrases) from the input document. + Each algorithm may produce completely different keywords from each other, + so please be careful when choosing the algorithm. + + *Note*: Calling :func: `extract_keywords()` is expensive. For repetitive use of KeyBERT (the default engine), + creating KeyBERT object is highly recommended. + + :param str text: text to be summarized + :param Tuple[int, int] keyphrase_ngram_range: Number of token units to be defined as keyword. + The token unit varies w.r.t. `tokenizer_engine`. + For instance, (1, 1) means each token (unigram) can be a keyword (e.g. "เสา", "ไฟฟ้า"), + (1, 2) means one and two consecutive tokens (unigram and bigram) can be keywords + (e.g. "เสา", "ไฟฟ้า", "เสาไฟฟ้า") (default: (1, 2)) + :param int max_keywords: Number of maximum keywords to be returned. (default: 5) + :param int min_df: Minimum frequency required to be a keyword. (default: 1) + :param str engine: Name of algorithm to use for keyword extraction. (default: 'keybert') + :param str tokenizer: Name of tokenizer engine to use. + Refer to options in :func: `pythainlp.tokenize.word_tokenizer() (default: 'newmm') + :param Optional[Iterable[str]] stop_words: A list of stop words (a.k.a words to be ignored). + If not specified, :func:`pythainlp.corpus.thai_stopwords` is used. (default: None) + + :return: list of keywords + + **Options for engine** + * *keybert* (default) - KeyBERT keyword extraction algorithm + * *frequency* - frequency of words + + :Example: + :: + + from pythainlp.summarize import extract_keywords + + text = ''' + อาหาร หมายถึง ของแข็งหรือของเหลว + ที่กินหรือดื่มเข้าสู่ร่างกายแล้ว + จะทำให้เกิดพลังงานและความร้อนแก่ร่างกาย + ทำให้ร่างกายเจริญเติบโต + ซ่อมแซมส่วนที่สึกหรอ ควบคุมการเปลี่ยนแปลงต่างๆ ในร่างกาย + ช่วยทำให้อวัยวะต่างๆ ทำงานได้อย่างปกติ + อาหารจะต้องไม่มีพิษและไม่เกิดโทษต่อร่างกาย + ''' + + keywords = extract_keywords(text) + + # output: ['อวัยวะต่างๆ', + # 'ซ่อมแซมส่วน', + # 'เจริญเติบโต', + # 'ควบคุมการเปลี่ยนแปลง', + # 'มีพิษ'] + + keywords = extract_keywords(text, max_keywords=10) + + # output: ['อวัยวะต่างๆ', + # 'ซ่อมแซมส่วน', + # 'เจริญเติบโต', + # 'ควบคุมการเปลี่ยนแปลง', + # 'มีพิษ', + # 'ทำให้ร่างกาย', + # 'ร่างกายเจริญเติบโต', + # 'จะทำให้เกิด', + # 'มีพิษและ', + # 'เกิดโทษ'] + + """ + + def rank_by_frequency( + text: str, + max_keywords: int = 5, + min_df: int = 5, + tokenizer: str = "newmm", + stop_words: Optional[Iterable[str]] = None, + ): + from pythainlp.util.keywords import rank + from pythainlp.tokenize import word_tokenize + + tokens = word_tokenize(text, engine=tokenizer, keep_whitespace=False) + + use_custom_stop_words = stop_words is not None + + if use_custom_stop_words: + tokens = [token for token in tokens if token not in stop_words] + + word_rank = rank(tokens, exclude_stopwords=not use_custom_stop_words) + + keywords = [ + kw + for kw, cnt in word_rank.most_common(max_keywords) + if cnt >= min_df + ] + + return keywords + + engines = ["keybert", "frequency"] + + if engine == "keybert": + from .keybert import KeyBERT + + keywords = KeyBERT().extract_keywords( + text, + keyphrase_ngram_range=keyphrase_ngram_range, + max_keywords=max_keywords, + min_df=min_df, + tokenizer=tokenizer, + return_similarity=False, + stop_words=stop_words, + ) + elif engine == "frequency": + return rank_by_frequency( + text, + max_keywords=max_keywords, + min_df=min_df, + tokenizer=tokenizer, + stop_words=stop_words, + ) + + else: + # currently not supported + raise ValueError( + f"Keyword extractor {repr(engine)} is currently not supported. " + f"Use one of {engines}." + ) + + return keywords diff --git a/pythainlp/summarize/keybert.py b/pythainlp/summarize/keybert.py new file mode 100644 index 000000000..9be538fe1 --- /dev/null +++ b/pythainlp/summarize/keybert.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +""" +Minimal re-implementation of KeyBERT. + +KeyBERT is a minimal and easy-to-use keyword extraction technique +that leverages BERT embeddings to create keywords and keyphrases +that are most similar to a document. + +https://github.com/MaartenGr/KeyBERT +""" +from typing import List, Optional, Iterable, Tuple, Union +from collections import Counter + +import numpy as np +from transformers import pipeline + +from pythainlp.corpus import thai_stopwords +from pythainlp.tokenize import word_tokenize + + +class KeyBERT: + def __init__( + self, model_name: str = "airesearch/wangchanberta-base-att-spm-uncased" + ): + self.ft_pipeline = pipeline( + "feature-extraction", + tokenizer=model_name, + model=model_name, + revision="main", + ) + + def extract_keywords( + self, + text: str, + keyphrase_ngram_range: Tuple[int, int] = (1, 2), + max_keywords: int = 5, + min_df: int = 1, + tokenizer: str = "newmm", + return_similarity=False, + stop_words: Optional[Iterable[str]] = None, + ) -> Union[List[str], List[Tuple[str, float]]]: + """ + Extract Thai keywords and/or keyphrases with KeyBERT algorithm. + See https://github.com/MaartenGr/KeyBERT. + + :param str text: text to be summarized + :param Tuple[int, int] keyphrase_ngram_range: Number of token units to be defined as keyword. + The token unit varies w.r.t. `tokenizer_engine`. + For instance, (1, 1) means each token (unigram) can be a keyword (e.g. "เสา", "ไฟฟ้า"), + (1, 2) means one and two consecutive tokens (unigram and bigram) can be keywords + (e.g. "เสา", "ไฟฟ้า", "เสาไฟฟ้า") (default: (1, 2)) + :param int max_keywords: Number of maximum keywords to be returned. (default: 5) + :param int min_df: Minimum frequency required to be a keyword. (default: 1) + :param str tokenizer: Name of tokenizer engine to use. + Refer to options in :func: `pythainlp.tokenize.word_tokenizer() (default: 'newmm') + :param bool return_similarity: If `True`, return keyword scores. (default: False) + :param Optional[Iterable[str]] stop_words: A list of stop words (a.k.a words to be ignored). + If not specified, :func:`pythainlp.corpus.thai_stopwords` is used. (default: None) + + :return: list of keywords with score + + :Example: + :: + + from pythainlp.summarize.keybert import KeyBERT + + text = ''' + อาหาร หมายถึง ของแข็งหรือของเหลว + ที่กินหรือดื่มเข้าสู่ร่างกายแล้ว + จะทำให้เกิดพลังงานและความร้อนแก่ร่างกาย + ทำให้ร่างกายเจริญเติบโต + ซ่อมแซมส่วนที่สึกหรอ ควบคุมการเปลี่ยนแปลงต่างๆ ในร่างกาย + ช่วยทำให้อวัยวะต่างๆ ทำงานได้อย่างปกติ + อาหารจะต้องไม่มีพิษและไม่เกิดโทษต่อร่างกาย + ''' + + kb = KeyBERT() + + keywords = kb.extract_keyword(text) + + # output: ['อวัยวะต่างๆ', + # 'ซ่อมแซมส่วน', + # 'เจริญเติบโต', + # 'ควบคุมการเปลี่ยนแปลง', + # 'มีพิษ'] + + keywords = kb.extract_keyword(text, max_keywords=10, return_similarity=True) + + # output: [('อวัยวะต่างๆ', 0.3228477063109462), + # ('ซ่อมแซมส่วน', 0.31320597838000375), + # ('เจริญเติบโต', 0.29115434699705506), + # ('ควบคุมการเปลี่ยนแปลง', 0.2678430841321016), + # ('มีพิษ', 0.24996827960821494), + # ('ทำให้ร่างกาย', 0.23876962942443258), + # ('ร่างกายเจริญเติบโต', 0.23191285218852364), + # ('จะทำให้เกิด', 0.22425422716846247), + # ('มีพิษและ', 0.22162962875299588), + # ('เกิดโทษ', 0.20773497763458507)] + + """ + try: + text = text.strip() + except AttributeError: + raise AttributeError( + f"Unable to process data of type {type(text)}. " + f"Please provide input of string type." + ) + + if not text: + return [] + + # generate all list of keyword / keyphrases + stop_words_ = stop_words if stop_words else thai_stopwords() + kw_candidates = _generate_ngrams( + text, keyphrase_ngram_range, min_df, tokenizer, stop_words_ + ) + + # create document and word vectors + doc_vector = self.embed(text) + kw_vectors = self.embed(kw_candidates) + + # rank keywords + keywords = _rank_keywords( + doc_vector, kw_vectors, kw_candidates, max_keywords + ) + + if return_similarity: + return keywords + else: + return [kw for kw, _ in keywords] + + def embed(self, docs: Union[str, List[str]]) -> np.ndarray: + """ + Create an embedding of each input in `docs` by averaging vectors from last hidden layer. + """ + embs = self.ft_pipeline(docs) + if isinstance(docs, str) or len(docs) == 1: + # embed doc. return shape = [1, hidden_size] + emb_mean = np.array(embs).mean(axis=1) + else: + # mean of embedding of each word + # return shape = [len(docs), hidden_size] + emb_mean = np.stack( + [np.array(emb[0]).mean(axis=0) for emb in embs] + ) + + return emb_mean + + +def _generate_ngrams( + doc: str, + keyphrase_ngram_range: Tuple[int, int], + min_df: int, + tokenizer_engine: str, + stop_words: Iterable[str], +) -> List[str]: + assert keyphrase_ngram_range[0] >= 1, ( + f"`keyphrase_ngram_range` must start from 1. " + f"current value={keyphrase_ngram_range}." + ) + + assert keyphrase_ngram_range[0] <= keyphrase_ngram_range[1], ( + f"The value first argument of `keyphrase_ngram_range` must not exceed the second. " + f"current value={keyphrase_ngram_range}." + ) + + def _join_ngram(ngrams: List[Tuple[str, str]]) -> List[str]: + ngrams_joined = [] + for ng in ngrams: + joined = "".join(ng) + if joined.strip() == joined: + # ngram must not start or end with whitespace as this may cause duplication. + ngrams_joined.append(joined) + return ngrams_joined + + words = word_tokenize(doc, engine=tokenizer_engine) + all_grams = [] + ngram_range = (keyphrase_ngram_range[0], keyphrase_ngram_range[1] + 1) + for n in range(*ngram_range): + if n == 1: + # filter out space + ngrams = [word for word in words if word.strip()] + else: + ngrams_tuple = zip(*[words[i:] for i in range(n)]) + ngrams = _join_ngram(ngrams_tuple) + + ngrams_cnt = Counter(ngrams) + ngrams = [ + word + for word, freq in ngrams_cnt.items() + if (freq >= min_df) and (word not in stop_words) + ] + all_grams.extend(ngrams) + + return all_grams + + +def _rank_keywords( + doc_vector: np.ndarray, + word_vectors: np.ndarray, + keywords: List[str], + max_keywords: int, +) -> List[Tuple[str, float]]: + def l2_norm(v: np.ndarray) -> np.ndarray: + vec_size = v.shape[1] + result = np.divide( + v, + np.linalg.norm(v, axis=1).reshape(-1, 1).repeat(vec_size, axis=1), + ) + assert np.isclose( + np.linalg.norm(result, axis=1), 1 + ).all(), "Cannot normalize a vector to unit vector." + return result + + def cosine_sim(a: np.ndarray, b: np.ndarray) -> np.ndarray: + return (np.matmul(a, b.T).T).sum(axis=1) + + doc_vector = l2_norm(doc_vector) + word_vectors = l2_norm(word_vectors) + cosine_sims = cosine_sim(doc_vector, word_vectors) + ranking_desc = np.argsort(-cosine_sims) + + final_ranks = [ + (keywords[r], cosine_sims[r]) for r in ranking_desc[:max_keywords] + ] + return final_ranks diff --git a/tests/test_summarize.py b/tests/test_summarize.py index 98b670343..e00e1e909 100644 --- a/tests/test_summarize.py +++ b/tests/test_summarize.py @@ -2,7 +2,7 @@ import unittest -from pythainlp.summarize import summarize +from pythainlp.summarize import summarize, extract_keywords class TestSummarizePackage(unittest.TestCase): @@ -27,3 +27,75 @@ def test_summarize(self): self.assertIsNotNone(summarize(text, 1, engine="XX")) with self.assertRaises(ValueError): self.assertIsNotNone(summarize(text, 1, engine="mt5-cat")) + + def test_keyword_extraction(self): + text = ( + "อาหาร หมายถึง ของแข็งหรือของเหลว " + "ที่กินหรือดื่มเข้าสู่ร่างกายแล้ว " + "จะทำให้เกิดพลังงานและความร้อนแก่ร่างกาย " + "ทำให้ร่างกายเจริญเติบโต " + "ซ่อมแซมส่วนที่สึกหรอ ควบคุมการเปลี่ยนแปลงต่างๆ ในร่างกาย " + "ช่วยทำให้อวัยวะต่างๆ ทำงานได้อย่างปกติ " + "อาหารจะต้องไม่มีพิษและไม่เกิดโทษต่อร่างกาย" + ) + self.assertEqual(extract_keywords(""), []) + self.assertEqual(extract_keywords(" "), []) + + # test default engine, common case + keywords = extract_keywords(text) + expected = ["ซ่อมแซมส่วน", "เจริญเติบโต", "อวัยวะต่างๆ", "ควบคุมการเปลี่ยนแปลง"] + for exp_kw in expected: + self.assertIn(exp_kw, keywords) + + # test another engine + for max_kw in (5, 10): + keywords = extract_keywords(text, engine="frequency", max_keywords=max_kw) + self.assertEqual(len(keywords), max_kw) + + # test invalid engine + with self.assertRaises(ValueError): + extract_keywords(text, engine="random engine") + + # test different tokenizer + keywords = extract_keywords(text, tokenizer="attacut") + + expected = ["อวัยวะต่างๆ", "ซ่อมแซมส่วน", "เจริญเติบโต", "เกิดพลังงาน"] + for exp_kw in expected: + self.assertIn(exp_kw, keywords) + + # test overriding stop words + stpw = "เจริญเติบโต" + keywords = extract_keywords(text, stop_words=[stpw]) + self.assertNotIn(stpw, keywords) + + def test_keybert(self): + text = ( + "อาหาร หมายถึง ของแข็งหรือของเหลว " + "ที่กินหรือดื่มเข้าสู่ร่างกายแล้ว " + "จะทำให้เกิดพลังงานและความร้อนแก่ร่างกาย " + "ทำให้ร่างกายเจริญเติบโต " + "ซ่อมแซมส่วนที่สึกหรอ ควบคุมการเปลี่ยนแปลงต่างๆ ในร่างกาย " + "ช่วยทำให้อวัยวะต่างๆ ทำงานได้อย่างปกติ " + "อาหารจะต้องไม่มีพิษและไม่เกิดโทษต่อร่างกาย" + ) + + from pythainlp.summarize.keybert import KeyBERT + from pythainlp.tokenize import word_tokenize + + keybert = KeyBERT() + # test ngram range + ng_ranges = [(1, 1), (1, 2), (2, 2), (3, 3)] + for ng_min, ng_max in ng_ranges: + keywords = keybert.extract_keywords(text, keyphrase_ngram_range=(ng_min, ng_max)) + + for kw in keywords: + self.assertTrue(ng_min <= len(word_tokenize(kw)) <= ng_max) + + # test max_keywords + max_kws = 10 + keywords = keybert.extract_keywords(text, max_keywords=max_kws) + self.assertLessEqual(len(keywords), max_kws) + + text_short = "เฮลโหล" + keywords = keybert.extract_keywords(text_short, max_keywords=max_kws) + self.assertLessEqual(len(keywords), max_kws)