diff --git a/core/db_adapter/aioredis_adapter.py b/core/db_adapter/aioredis_adapter.py index e80e88a9..1ab32779 100644 --- a/core/db_adapter/aioredis_adapter.py +++ b/core/db_adapter/aioredis_adapter.py @@ -22,15 +22,15 @@ def __init__(self, config=None): except KeyError: pass - @monitoring.got_histogram_decorate("save_time") + @monitoring.got_histogram("save_time") async def save(self, id, data): return await self._run(self._save, id, data) - @monitoring.got_histogram_decorate("save_time") + @monitoring.got_histogram("save_time") async def replace_if_equals(self, id, sample, data): return await self._run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram_decorate("get_time") + @monitoring.got_histogram("get_time") async def get(self, id): return await self._run(self._get, id) diff --git a/core/db_adapter/aioredis_sentinel_adapter.py b/core/db_adapter/aioredis_sentinel_adapter.py index 070d6572..8379e888 100644 --- a/core/db_adapter/aioredis_sentinel_adapter.py +++ b/core/db_adapter/aioredis_sentinel_adapter.py @@ -24,16 +24,16 @@ def __init__(self, config=None): except KeyError: pass - @monitoring.got_histogram_decorate("save_time") + @monitoring.got_histogram("save_time") async def save(self, id, data): return await self._run(self._save, id, data) - @monitoring.got_histogram_decorate("save_time") + @monitoring.got_histogram("save_time") async def replace_if_equals(self, id, sample, data): return await self._run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram_decorate("get_time") + @monitoring.got_histogram("get_time") async def get(self, id): return await self._run(self._get, id) diff --git a/core/db_adapter/db_adapter.py b/core/db_adapter/db_adapter.py index 9ac6979a..93cc3227 100644 --- a/core/db_adapter/db_adapter.py +++ b/core/db_adapter/db_adapter.py @@ -61,15 +61,15 @@ def path_exists(self, path): def mtime(self, path): return self._run(self._mtime, path) - @monitoring.got_histogram_decorate("save_time") + @monitoring.got_histogram("save_time") def save(self, id, data): return self._run(self._save, id, data) - @monitoring.got_histogram_decorate("save_time") + @monitoring.got_histogram("save_time") def replace_if_equals(self, id, sample, data): return self._run(self._replace_if_equals, id, sample, data) - @monitoring.got_histogram_decorate("get_time") + @monitoring.got_histogram("get_time") def get(self, id): return self._run(self._get, id) diff --git a/core/monitoring/__init__.py b/core/monitoring/__init__.py deleted file mode 100644 index cf33c891..00000000 --- a/core/monitoring/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from core.monitoring import monitoring - - -def init_monitoring(monitoring_class): - if not isinstance(monitoring.monitoring, monitoring_class): - monitoring.monitoring = monitoring_class() diff --git a/core/monitoring/monitoring.py b/core/monitoring/monitoring.py index fb865d00..605a1ea0 100644 --- a/core/monitoring/monitoring.py +++ b/core/monitoring/monitoring.py @@ -1,7 +1,32 @@ # coding: utf-8 import re -from prometheus_client import Counter, Histogram +from core.logging.logger_constants import KEY_NAME +from core.logging.logger_utils import log + +from prometheus_client import Counter, Histogram, REGISTRY + + +def _filter_monitoring_msg(msg): + msg = msg.replace("-", ":") + return msg + + +class MetricDisabled(ValueError): + pass + + +def silence_it(func): + def wrap(*args, **kwargs): + try: + func(*args, **kwargs) + except MetricDisabled as error: + log(f"Metrics: {error}", + params={KEY_NAME: "metrics_disabled"}, level="DEBUG") + except: + log("Metrics: Failed send. Exception occurred.", + params={KEY_NAME: "metrics_fail"}, level="ERROR", exc_info=True) + return wrap class Monitoring: @@ -12,12 +37,20 @@ class Monitoring: def __init__(self): self._enabled = self.DEFAULT_ENABLED + self.disabled_metrics = self.DEFAULT_DISABLED_METRICS.copy() + self.buckets = Histogram.DEFAULT_BUCKETS self._monitoring_items = { self.COUNTER: {}, self.HISTOGRAM: {} } - self.disabled_metrics = self.DEFAULT_DISABLED_METRICS.copy() - self.buckets = Histogram.DEFAULT_BUCKETS + self._clean_registry() + + @staticmethod + def _clean_registry(): + """При создании нового инстанса мониторинга удаляем все созданные коллекторы""" + collectors = list(REGISTRY._collector_to_names.keys()) + for collector in collectors: + REGISTRY.unregister(collector) def check_enabled(self, name: str): metric_disabled = next((True for m in self.disabled_metrics if re.fullmatch(m, name)), False) @@ -42,10 +75,10 @@ def got_counter(self, name, description=None, labels=()): if counter: counter.inc() - def got_histogram_decorate(self, name, description=None): + def got_histogram(self, name, description=None): def decor(func): def wrap(*args, **kwargs): - decor_ = self.got_histogram(name, description=None) + decor_ = self._got_histogram(name, description=None) if decor_: return decor_(func)(*args, **kwargs) else: @@ -53,7 +86,7 @@ def wrap(*args, **kwargs): return wrap return decor - def got_histogram(self, name, description=None): + def _got_histogram(self, name, description=None): if self.check_enabled(name): histogram = self._monitoring_items[self.HISTOGRAM] if not histogram.get(name): @@ -72,5 +105,201 @@ def apply_config(self, config): self.disabled_metrics = config.get("disabled_metrics", self.DEFAULT_DISABLED_METRICS.copy()) self.buckets = config.get("buckets", Histogram.DEFAULT_BUCKETS) + @silence_it + def init_metrics(self, app_name): + self._get_or_create_counter(_filter_monitoring_msg("{}_load_error".format(app_name)), "Load user data error") + self._get_or_create_counter(_filter_monitoring_msg("{}_save_error".format(app_name)), "Save user data error") + self._get_or_create_counter(_filter_monitoring_msg("{}_save_collision".format(app_name)), + "Save user data collision") + self._get_or_create_counter(_filter_monitoring_msg("{}_save_collision_tries_left".format(app_name)), + "Save user data collision all retries left.") + self._get_or_create_counter(_filter_monitoring_msg("{}_exception".format(app_name)), "Exception in run-time.") + self._get_or_create_counter(_filter_monitoring_msg("{}_invalid_message".format(app_name)), + "Incoming message validation error.") + + def _get_or_create_counter(self, monitoring_msg, descr, labels=()): + counter = monitoring.get_counter(monitoring_msg, descr, labels) + if counter is None: + raise MetricDisabled('counter disabled') + return counter + + @silence_it + def counter_incoming(self, app_name, message_name, handler, user, app_info=None): + monitoring_msg = _filter_monitoring_msg("{}_incoming".format(app_name)) + + c = self._get_or_create_counter(monitoring_msg, "Count of incoming messages", + ['message_name', 'handler', 'project_id', 'system_name', 'application_id', + 'app_version_id', 'channel', 'surface']) + if app_info is not None: + project_id = app_info.project_id + system_name = app_info.system_name + application_id = app_info.application_id + app_version_id = app_info.app_version_id + else: + project_id = system_name = application_id = app_version_id = None + c.labels(message_name, handler, project_id, system_name, + application_id, app_version_id, user.message.channel, user.message.device.surface).inc() + + @silence_it + def counter_outgoing(self, app_name, message_name, outgoing_message, user): + + monitoring_msg = _filter_monitoring_msg("{}_outgoing".format(app_name)) + + c = self._get_or_create_counter(monitoring_msg, "Count of outgoing requests from application.", + ['message_name', 'project_id', 'system_name', 'application_id', + 'app_version_id', 'channel', 'surface']) + app_info = user.message.app_info + c.labels(message_name, app_info.project_id, app_info.system_name, + app_info.application_id, + app_info.app_version_id, user.message.channel, user.message.device.surface).inc() + + @silence_it + def counter_scenario_change(self, app_name, scenario, user): + monitoring_msg = "{}_scenario_change".format(app_name) + + c = self._get_or_create_counter(monitoring_msg, "Count of scenario change events", + ['scenario', 'project_id', 'system_name', 'application_id', + 'app_version_id', 'channel', 'surface']) + app_info = user.message.app_info + c.labels(scenario, app_info.project_id, app_info.system_name, + app_info.application_id, + app_info.app_version_id, user.message.channel, user.message.device.surface).inc() + + @silence_it + def counter_nothing_found(self, app_name, scenario, user): + monitoring_msg = "{}_outgoing_nothing_found".format(app_name) + c = self._get_or_create_counter(monitoring_msg, "Count of scenario nothing found events", + ['scenario', 'project_id', 'system_name', 'application_id', + 'app_version_id', 'channel', 'surface']) + app_info = user.message.app_info + c.labels(scenario, app_info.project_id, app_info.system_name, + app_info.application_id, + app_info.app_version_id, user.message.channel, user.message.device.surface).inc() + + @silence_it + def counter_load_error(self, app_name): + monitoring_msg = "{}_load_error".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Load user data error") + c.inc() + + @silence_it + def counter_save_error(self, app_name): + monitoring_msg = "{}_save_error".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Save user data error") + c.inc() + + @silence_it + def counter_save_collision(self, app_name): + monitoring_msg = "{}_save_collision".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Save user data collision") + c.inc() + + @silence_it + def counter_save_collision_tries_left(self, app_name): + monitoring_msg = "{}_save_collision_tries_left".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), + "Save user data collision all retries left.") + c.inc() + + @silence_it + def counter_exception(self, app_name): + monitoring_msg = "{}_exception".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Exception in run-time.") + c.inc() + + @silence_it + def counter_invalid_message(self, app_name): + monitoring_msg = "{}_invalid_message".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Incoming message validation error.") + c.inc() + + @silence_it + def counter_behavior_success(self, app_name, request_message_name): + # 'Number of Success replies on messageName' + self._behavior_monitoing_by_status(app_name, "success", request_message_name) + + @silence_it + def counter_behavior_fail(self, app_name, request_message_name): + # 'Number of Fail replies on messageName' + self._behavior_monitoing_by_status(app_name, "fail", request_message_name) + + @silence_it + def counter_behavior_misstate(self, app_name, request_message_name): + # 'Number of Misstate replies on messageName' + self._behavior_monitoing_by_status(app_name, "misstate", request_message_name) + + @silence_it + def counter_behavior_timeout(self, app_name, request_message_name): + # 'Number of Timeout replies on messageName' + self._behavior_monitoing_by_status(app_name, "timeout", request_message_name) + + @silence_it + def counter_behavior_expire(self, app_name, request_message_name): + # 'Number of expire events on messageName' + self._behavior_monitoing_by_status(app_name, "expire", request_message_name) + + def _behavior_monitoing_by_status(self, app_name, status, request_message_name): + monitoring_msg = '{}_callback'.format(app_name) + c = self._get_or_create_counter(monitoring_msg, + "Count of incoming callback events with request_message_name", + ['request_message_name', 'status']) + + c.labels(request_message_name, status).inc() + + @silence_it + def counter_host_has_changed(self, app_name): + monitoring_msg = '{}_host_has_changed'.format(app_name) + c = self._get_or_create_counter(monitoring_msg, + "Count of host has changed events within one message_id") + c.inc() + + @silence_it + def counter_mq_long_waiting(self, app_name): + monitoring_msg = "{}_mq_long_waiting".format(app_name) + c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), + "(Now - creation_time) is greater than threshold") + c.inc() + + @silence_it + def sampling_load_time(self, app_name, value): + monitoring_msg = "{}_load_time".format(app_name) + monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + + @silence_it + def sampling_script_time(self, app_name, value): + monitoring_msg = "{}_script_time".format(app_name) + monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + + @silence_it + def sampling_save_time(self, app_name, value): + monitoring_msg = "{}_save_time".format(app_name) + monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + + @silence_it + def sampling_mq_waiting_time(self, app_name, value): + monitoring_msg = "{}_mq_waiting_time".format(app_name) + monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) + + +class Proxy: + def __init__(self, default_cls): + self.instance = default_cls() + + def got_histogram(self, *args_, **kwargs_): + def decor_(func): + def wrap(*args, **kwargs): + wrapped_func = self.instance.got_histogram(*args_, **kwargs_)(func) + value = wrapped_func(*args, **kwargs) + return value + return wrap + return decor_ + + def set_instance(self, cls): + if type(self.instance) != cls: + self.instance = cls() + + def __getattr__(self, item): + return getattr(self.instance, item) + -monitoring = Monitoring() +monitoring = Proxy(Monitoring) diff --git a/scenarios/behaviors/behaviors.py b/scenarios/behaviors/behaviors.py index ce397d05..98cd534d 100644 --- a/scenarios/behaviors/behaviors.py +++ b/scenarios/behaviors/behaviors.py @@ -10,7 +10,7 @@ from core.text_preprocessing.preprocessing_result import TextPreprocessingResult from core.utils.pickle_copy import pickle_deepcopy from scenarios.actions.action_params_names import TO_MESSAGE_NAME, TO_MESSAGE_PARAMS, LOCAL_VARS -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class Behaviors: @@ -136,7 +136,7 @@ def success(self, callback_id: str): self._log_callback( callback_id, "behavior_success", - smart_kit_metrics.counter_behavior_success, + monitoring.counter_behavior_success, "success", callback_action_params, ) @@ -158,7 +158,7 @@ def fail(self, callback_id: str): behavior = self.descriptions[callback.behavior_id] callback_action_params = callback.action_params self._log_callback(callback_id, "behavior_fail", - smart_kit_metrics.counter_behavior_fail, "fail", + monitoring.counter_behavior_fail, "fail", callback_action_params) text_preprocessing_result = TextPreprocessingResult(callback.text_preprocessing_result) result = behavior.fail_action.run(self._user, text_preprocessing_result, callback_action_params) @@ -177,7 +177,7 @@ def timeout(self, callback_id: str): behavior = self.descriptions[callback.behavior_id] callback_action_params = callback.action_params self._log_callback(callback_id, "behavior_timeout", - smart_kit_metrics.counter_behavior_timeout, "timeout", + monitoring.counter_behavior_timeout, "timeout", callback_action_params) text_preprocessing_result = TextPreprocessingResult(callback.text_preprocessing_result) result = behavior.timeout_action.run(self._user, text_preprocessing_result, callback_action_params) @@ -197,7 +197,7 @@ def misstate(self, callback_id: str): behavior = self.descriptions[callback.behavior_id] callback_action_params = callback.action_params self._log_callback(callback_id, "behavior_misstate", - smart_kit_metrics.counter_behavior_misstate, "misstate", + monitoring.counter_behavior_misstate, "misstate", callback_action_params) text_preprocessing_result = TextPreprocessingResult(callback.text_preprocessing_result) result = behavior.misstate_action.run(self._user, text_preprocessing_result, callback_action_params) @@ -248,7 +248,7 @@ def expire(self): callback_action_params = self.get_callback_action_params(callback_id) to_message_name = callback_action_params.get(TO_MESSAGE_NAME) app_info = callback_action_params.get(APP_INFO, {}) - smart_kit_metrics.counter_behavior_expire(self._user.settings.app_name, to_message_name) + monitoring.counter_behavior_expire(self._user.settings.app_name, to_message_name) log_params = {log_const.KEY_NAME: "behavior_expire", log_const.BEHAVIOR_CALLBACK_ID_VALUE: callback_id, log_const.BEHAVIOR_DATA_VALUE: str(self._callbacks[callback_id]), @@ -300,4 +300,4 @@ def _check_hostname(self, callback_id, callback) -> None: log_const.LAST_HOSTNAME: callback.hostname, } ) - smart_kit_metrics.counter_host_has_changed(self._user.settings.app_name) + monitoring.counter_host_has_changed(self._user.settings.app_name) diff --git a/scenarios/scenario_descriptions/form_filling_scenario.py b/scenarios/scenario_descriptions/form_filling_scenario.py index fece0018..8dbe0f02 100644 --- a/scenarios/scenario_descriptions/form_filling_scenario.py +++ b/scenarios/scenario_descriptions/form_filling_scenario.py @@ -174,7 +174,7 @@ def get_reply(self, user, text_preprocessing_result, reply_actions, field, form) user.last_scenarios.delete(self.id) return action_messages - @monitoring.got_histogram_decorate("scenario_time") + @monitoring.got_histogram("scenario_time") def run(self, text_preprocessing_result, user, params: Dict[str, Any] = None): form = self._get_form(user) user.last_scenarios.add(self.id, text_preprocessing_result) diff --git a/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py b/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py index 8274b9d5..8c2a60a4 100644 --- a/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py +++ b/scenarios/scenario_descriptions/tree_scenario/tree_scenario.py @@ -84,7 +84,7 @@ def get_fields_data(self, main_form, form_type): all_forms_fields.update(form_field_data) return all_forms_fields - @monitoring.got_histogram_decorate("scenario_time") + @monitoring.got_histogram("scenario_time") def run(self, text_preprocessing_result, user, params: Dict[str, Any] = None): main_form = self._get_form(user) user.last_scenarios.add(self.id, text_preprocessing_result) diff --git a/scenarios/user/user_model.py b/scenarios/user/user_model.py index 75af4818..d6810cbb 100644 --- a/scenarios/user/user_model.py +++ b/scenarios/user/user_model.py @@ -15,7 +15,7 @@ from scenarios.scenario_models.history import History from scenarios.behaviors.behaviors import Behaviors from scenarios.user.reply_selector.reply_selector import ReplySelector -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring import scenarios.logging.logger_constants as log_const class User(BaseUser): @@ -34,7 +34,7 @@ def __init__(self, id, message, db_data, settings, descriptions, parametrizer_cl user_values = json.loads(db_data) if db_data else None except ValueError: user_values = None - smart_kit_metrics.counter_load_error(settings.app_name) + monitoring.counter_load_error(settings.app_name) log(f"%(class_name)s.__init__ User load ValueError %(uid)s with user data {db_data}", params={log_const.KEY_NAME: log_const.FAILED_DB_INTERACTION, "uid": str(id)}, level="ERROR") diff --git a/smart_kit/configs/__init__.py b/smart_kit/configs/__init__.py index 88aacc19..ba97d408 100644 --- a/smart_kit/configs/__init__.py +++ b/smart_kit/configs/__init__.py @@ -29,10 +29,9 @@ def get_app_config(environment_variable=ENVIRONMENT_VARIABLE): set_default(app_config, "REFERENCES_PATH", references_path) # import and init monitoring first - because other classes use a singleton instance of Monitoring - from core.monitoring.monitoring import Monitoring - from core.monitoring import init_monitoring + from core.monitoring.monitoring import Monitoring, monitoring set_default(app_config, 'MONITORING', Monitoring) - init_monitoring(app_config.MONITORING) + monitoring.set_instance(app_config.MONITORING) from core.logging.logger_utils import LoggerMessageCreator from core.message.from_message import SmartAppFromMessage diff --git a/smart_kit/handlers/handle_respond.py b/smart_kit/handlers/handle_respond.py index d4d2cf9c..18e8e206 100644 --- a/smart_kit/handlers/handle_respond.py +++ b/smart_kit/handlers/handle_respond.py @@ -7,7 +7,7 @@ from smart_kit.handlers.handler_base import HandlerBase from smart_kit.names.action_params_names import TO_MESSAGE_NAME, SEND_TIMESTAMP -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class HandlerRespond(HandlerBase): @@ -44,7 +44,7 @@ def run(self, payload, user): text_preprocessing_result = TextPreprocessingResult(payload.get("message", {})) - smart_kit_metrics.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, user) + monitoring.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, user) params = { log_const.KEY_NAME: log_const.NORMALIZED_TEXT_VALUE, diff --git a/smart_kit/handlers/handle_server_action.py b/smart_kit/handlers/handle_server_action.py index 3eeb10c2..8c7cee46 100644 --- a/smart_kit/handlers/handle_server_action.py +++ b/smart_kit/handlers/handle_server_action.py @@ -6,7 +6,7 @@ from core.utils.pickle_copy import pickle_deepcopy from smart_kit.handlers.handler_base import HandlerBase -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class HandlerServerAction(HandlerBase): @@ -30,7 +30,7 @@ def run(self, payload, user): log("HandlerServerAction %(server_action_id)s started", user, params) app_info = user.message.app_info - smart_kit_metrics.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, + monitoring.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, user, app_info=app_info) action_id = self.get_action_name(payload, user) diff --git a/smart_kit/handlers/handler_base.py b/smart_kit/handlers/handler_base.py index 697ef96b..986e61d2 100644 --- a/smart_kit/handlers/handler_base.py +++ b/smart_kit/handlers/handler_base.py @@ -1,5 +1,5 @@ # coding: utf-8 -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class HandlerBase: @@ -11,5 +11,5 @@ def __init__(self, app_name): def run(self, payload, user): # отправка события о входящем сообщении в систему мониторинга - smart_kit_metrics.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, + monitoring.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, user, app_info=user.message.app_info) diff --git a/smart_kit/handlers/handler_timeout.py b/smart_kit/handlers/handler_timeout.py index 570bcf5e..fa32e787 100644 --- a/smart_kit/handlers/handler_timeout.py +++ b/smart_kit/handlers/handler_timeout.py @@ -5,7 +5,7 @@ from smart_kit.message.app_info import AppInfo from core.names.field import APP_INFO from smart_kit.names.message_names import RUN_APP, MESSAGE_TO_SKILL, SERVER_ACTION -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class HandlerTimeout(HandlerBase): @@ -24,7 +24,7 @@ def run(self, payload, user): app_info = AppInfo(action_params[original_message_name].get(APP_INFO, {})) break - smart_kit_metrics.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, + monitoring.counter_incoming(self.app_name, user.message.message_name, self.__class__.__name__, user, app_info=app_info) callback_id = user.message.callback_id diff --git a/smart_kit/models/dialogue_manager.py b/smart_kit/models/dialogue_manager.py index afeaa235..648d4345 100644 --- a/smart_kit/models/dialogue_manager.py +++ b/smart_kit/models/dialogue_manager.py @@ -7,8 +7,7 @@ from scenarios.scenario_descriptions.form_filling_scenario import FormFillingScenario from smart_kit.system_answers.nothing_found_action import NothingFoundAction -from smart_kit.utils.monitoring import smart_kit_metrics - +from core.monitoring.monitoring import monitoring class DialogueManager: NOTHING_FOUND_ACTION = "nothing_found_action" @@ -43,7 +42,7 @@ def run(self, text_preprocessing_result, user): if scenario.check_ask_again_requests(text_preprocessing_result, user, params): reply = scenario.ask_again(text_preprocessing_result, user, params) return reply, True - smart_kit_metrics.counter_nothing_found(self.app_name, scenario_key, user) + monitoring.counter_nothing_found(self.app_name, scenario_key, user) return self._nothing_found_action.run(user, text_preprocessing_result), False return self.run_scenario(scenario_key, text_preprocessing_result, user), True @@ -59,6 +58,6 @@ def run_scenario(self, scen_id, text_preprocessing_result, user): actual_last_scenario = user.last_scenarios.last_scenario_name if actual_last_scenario and actual_last_scenario != initial_last_scenario: - smart_kit_metrics.counter_scenario_change(self.app_name, actual_last_scenario, user) + monitoring.counter_scenario_change(self.app_name, actual_last_scenario, user) return run_scenario_result diff --git a/smart_kit/models/smartapp_model.py b/smart_kit/models/smartapp_model.py index bf5a4855..50853a09 100644 --- a/smart_kit/models/smartapp_model.py +++ b/smart_kit/models/smartapp_model.py @@ -16,7 +16,7 @@ from smart_kit.handlers.handle_server_action import HandlerServerAction from smart_kit.resources import SmartAppResources from smart_kit.utils import get_callback_action_params, set_debug_info -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class SmartAppModel: @@ -68,7 +68,7 @@ def answer(self, message, user): def on_answer_error(self, message, user): user.do_not_save = True - smart_kit_metrics.counter_exception(self.app_name) + monitoring.counter_exception(self.app_name) params = {log_const.KEY_NAME: log_const.DIALOG_ERROR_VALUE, "message_id": user.message.incremental_id, "masked_message": user.message.masked_value} diff --git a/smart_kit/start_points/base_main_loop.py b/smart_kit/start_points/base_main_loop.py index 8b0baf31..e7ee92df 100644 --- a/smart_kit/start_points/base_main_loop.py +++ b/smart_kit/start_points/base_main_loop.py @@ -14,7 +14,6 @@ from core.basic_models.parametrizers.parametrizer import BasicParametrizer from core.message.msg_validator import MessageValidator from smart_kit.start_points.postprocess import PostprocessMainLoop -from smart_kit.utils.monitoring import smart_kit_metrics from smart_kit.models.smartapp_model import SmartAppModel @@ -94,7 +93,7 @@ def _create_health_check_server(self, settings): def _init_monitoring_config(self, template_settings): monitoring_config = template_settings["monitoring"] monitoring.apply_config(monitoring_config) - smart_kit_metrics.init_metrics(app_name=self.app_name) + monitoring.init_metrics(app_name=self.app_name) def load_user(self, db_uid, message): db_data = None @@ -105,7 +104,7 @@ def load_user(self, db_uid, message): log("Failed to get user data", params={log_const.KEY_NAME: log_const.FAILED_DB_INTERACTION, log_const.REQUEST_VALUE: str(message.value)}, level="ERROR") load_error = True - smart_kit_metrics.counter_load_error(self.app_name) + monitoring.counter_load_error(self.app_name) return self.user_cls( message.uid, message=message, @@ -135,9 +134,9 @@ def save_user(self, db_uid, user, message): except (DBAdapterException, ValueError): log("Failed to set user data", params={log_const.KEY_NAME: log_const.FAILED_DB_INTERACTION, log_const.REQUEST_VALUE: str(message.value)}, level="ERROR") - smart_kit_metrics.counter_save_error(self.app_name) + monitoring.counter_save_error(self.app_name) if not no_collisions: - smart_kit_metrics.counter_save_collision(self.app_name) + monitoring.counter_save_collision(self.app_name) return no_collisions def run(self): diff --git a/smart_kit/start_points/main_loop_async_http.py b/smart_kit/start_points/main_loop_async_http.py index 00e8a06a..551c4abd 100644 --- a/smart_kit/start_points/main_loop_async_http.py +++ b/smart_kit/start_points/main_loop_async_http.py @@ -13,7 +13,7 @@ from core.utils.stats_timer import StatsTimer from smart_kit.message.smartapp_to_message import SmartAppToMessage from smart_kit.start_points.main_loop_http import BaseHttpMainLoop -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring class AIOHttpMainLoop(BaseHttpMainLoop): @@ -49,7 +49,7 @@ async def load_user(self, db_uid, message): log("Failed to get user data", params={log_const.KEY_NAME: log_const.FAILED_DB_INTERACTION, log_const.REQUEST_VALUE: str(message.value)}, level="ERROR") load_error = True - smart_kit_metrics.counter_load_error(self.app_name) + monitoring.counter_load_error(self.app_name) return self.user_cls( message.uid, message=message, @@ -92,9 +92,9 @@ async def save_user(self, db_uid, user, message): except (DBAdapterException, ValueError): log("Failed to set user data", params={log_const.KEY_NAME: log_const.FAILED_DB_INTERACTION, log_const.REQUEST_VALUE: str(message.value)}, level="ERROR") - smart_kit_metrics.counter_save_error(self.app_name) + monitoring.counter_save_error(self.app_name) if not no_collisions: - smart_kit_metrics.counter_save_collision(self.app_name) + monitoring.counter_save_collision(self.app_name) return no_collisions def run(self): diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index f0bb323d..b64551b7 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -24,7 +24,7 @@ from smart_kit.names import message_names from smart_kit.request.kafka_request import SmartKitKafkaRequest from smart_kit.start_points.base_main_loop import BaseMainLoop -from smart_kit.utils.monitoring import smart_kit_metrics +from core.monitoring.monitoring import monitoring def _enrich_config_from_secret(kafka_config, secret_config): @@ -128,7 +128,7 @@ def _generate_answers(self, user, commands, message, **kwargs): else: answers.append(SmartAppToMessage(self.BAD_ANSWER_COMMAND, message=message, request=request)) - smart_kit_metrics.counter_outgoing(self.app_name, command.name, answer, user) + monitoring.counter_outgoing(self.app_name, command.name, answer, user) return answers @@ -194,7 +194,7 @@ def iterate_behavior_timeouts(self): "db_version": str(user.variables.get(user.USER_DB_VERSION))}, level="WARNING") - smart_kit_metrics.counter_save_collision_tries_left(self.app_name) + monitoring.counter_save_collision_tries_left(self.app_name) self.save_behavior_timeouts(user, mq_message, kafka_key) for answer in answers: self._send_request(user, answer, mq_message) @@ -232,7 +232,7 @@ def process_message(self, mq_message, consumer, kafka_key, stats): stats += "Waiting message: {} msecs\n".format(waiting_message_time) stats += "Mid: {}\n".format(message.incremental_id) - smart_kit_metrics.sampling_mq_waiting_time(self.app_name, waiting_message_time / 1000) + monitoring.sampling_mq_waiting_time(self.app_name, waiting_message_time / 1000) self.check_message_key(message, mq_message.key(), user) log( @@ -255,7 +255,7 @@ def process_message(self, mq_message, consumer, kafka_key, stats): db_uid = message.db_uid with StatsTimer() as load_timer: user = self.load_user(db_uid, message) - smart_kit_metrics.sampling_load_time(self.app_name, load_timer.secs) + monitoring.sampling_load_time(self.app_name, load_timer.secs) stats += "Loading time: {} msecs\n".format(load_timer.msecs) with StatsTimer() as script_timer: commands = self.model.answer(message, user) @@ -263,13 +263,13 @@ def process_message(self, mq_message, consumer, kafka_key, stats): answers = self._generate_answers(user=user, commands=commands, message=message, topic_key=topic_key, kafka_key=kafka_key) - smart_kit_metrics.sampling_script_time(self.app_name, script_timer.secs) + monitoring.sampling_script_time(self.app_name, script_timer.secs) stats += "Script time: {} msecs\n".format(script_timer.msecs) with StatsTimer() as save_timer: user_save_no_collisions = self.save_user(db_uid, user, message) - smart_kit_metrics.sampling_save_time(self.app_name, save_timer.secs) + monitoring.sampling_save_time(self.app_name, save_timer.secs) stats += "Saving time: {} msecs\n".format(save_timer.msecs) if not user_save_no_collisions: log( @@ -304,7 +304,7 @@ def process_message(self, mq_message, consumer, kafka_key, stats): log(f"Message validation failed, skip message handling.", params={log_const.KEY_NAME: "invalid_message", "data": data}, level="ERROR") - smart_kit_metrics.counter_invalid_message(self.app_name) + monitoring.counter_invalid_message(self.app_name) if user and not user_save_no_collisions: log( "MainLoop.iterate: db_save collision all tries left on uid %(uid)s db_version %(db_version)s.", @@ -318,7 +318,7 @@ def process_message(self, mq_message, consumer, kafka_key, stats): "db_version": str(user.variables.get(user.USER_DB_VERSION))}, level="WARNING") self.postprocessor.postprocess(user, message) - smart_kit_metrics.counter_save_collision_tries_left(self.app_name) + monitoring.counter_save_collision_tries_left(self.app_name) consumer.commit_offset(mq_message) def iterate(self, kafka_key): diff --git a/smart_kit/utils/monitoring.py b/smart_kit/utils/monitoring.py deleted file mode 100644 index 2b981fd1..00000000 --- a/smart_kit/utils/monitoring.py +++ /dev/null @@ -1,203 +0,0 @@ -from core.logging.logger_constants import KEY_NAME -from core.logging.logger_utils import log -from core.monitoring.monitoring import monitoring - - -def _filter_monitoring_msg(msg): - msg = msg.replace("-", ":") - return msg - - -class MetricDisabled(ValueError): - pass - - -def silence_it(func): - def wrap(*args, **kwargs): - try: - func(*args, **kwargs) - except MetricDisabled as error: - log(f"Metrics: {error}", - params={KEY_NAME: "metrics_disabled"}, level="DEBUG") - except: - log("Metrics: Failed send. Exception occurred.", - params={KEY_NAME: "metrics_fail"}, level="ERROR", exc_info=True) - return wrap - - -class Metrics: - - @silence_it - def init_metrics(self, app_name): - self._get_or_create_counter(_filter_monitoring_msg("{}_load_error".format(app_name)), "Load user data error") - self._get_or_create_counter(_filter_monitoring_msg("{}_save_error".format(app_name)), "Save user data error") - self._get_or_create_counter(_filter_monitoring_msg("{}_save_collision".format(app_name)), "Save user data collision") - self._get_or_create_counter(_filter_monitoring_msg("{}_save_collision_tries_left".format(app_name)), - "Save user data collision all retries left.") - self._get_or_create_counter(_filter_monitoring_msg("{}_exception".format(app_name)), "Exception in run-time.") - self._get_or_create_counter(_filter_monitoring_msg("{}_invalid_message".format(app_name)), - "Incoming message validation error.") - - def _get_or_create_counter(self, monitoring_msg, descr, labels=()): - counter = monitoring.get_counter(monitoring_msg, descr, labels) - if counter is None: - raise MetricDisabled('counter disabled') - return counter - - @silence_it - def counter_incoming(self, app_name, message_name, handler, user, app_info=None): - monitoring_msg = _filter_monitoring_msg("{}_incoming".format(app_name)) - - c = self._get_or_create_counter(monitoring_msg, "Count of incoming messages", - ['message_name', 'handler', 'project_id', 'system_name', 'application_id', - 'app_version_id', 'channel', 'surface']) - if app_info is not None: - project_id = app_info.project_id - system_name = app_info.system_name - application_id = app_info.application_id - app_version_id = app_info.app_version_id - else: - project_id = system_name = application_id = app_version_id = None - c.labels(message_name, handler, project_id, system_name, - application_id, app_version_id, user.message.channel, user.message.device.surface).inc() - - @silence_it - def counter_outgoing(self, app_name, message_name, outgoing_message, user): - - monitoring_msg = _filter_monitoring_msg("{}_outgoing".format(app_name)) - - c = self._get_or_create_counter(monitoring_msg, "Count of outgoing requests from application.", - ['message_name', 'project_id', 'system_name', 'application_id', - 'app_version_id', 'channel', 'surface']) - app_info = user.message.app_info - c.labels(message_name, app_info.project_id, app_info.system_name, - app_info.application_id, - app_info.app_version_id, user.message.channel, user.message.device.surface).inc() - - @silence_it - def counter_scenario_change(self, app_name, scenario, user): - monitoring_msg = "{}_scenario_change".format(app_name) - - c = self._get_or_create_counter(monitoring_msg, "Count of scenario change events", - ['scenario', 'project_id', 'system_name', 'application_id', - 'app_version_id', 'channel', 'surface']) - app_info = user.message.app_info - c.labels(scenario, app_info.project_id, app_info.system_name, - app_info.application_id, - app_info.app_version_id, user.message.channel, user.message.device.surface).inc() - - @silence_it - def counter_nothing_found(self, app_name, scenario, user): - monitoring_msg = "{}_outgoing_nothing_found".format(app_name) - c = self._get_or_create_counter(monitoring_msg, "Count of scenario nothing found events", - ['scenario', 'project_id', 'system_name', 'application_id', - 'app_version_id', 'channel', 'surface']) - app_info = user.message.app_info - c.labels(scenario, app_info.project_id, app_info.system_name, - app_info.application_id, - app_info.app_version_id, user.message.channel, user.message.device.surface).inc() - - @silence_it - def counter_load_error(self, app_name): - monitoring_msg = "{}_load_error".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Load user data error") - c.inc() - - @silence_it - def counter_save_error(self, app_name): - monitoring_msg = "{}_save_error".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Save user data error") - c.inc() - - @silence_it - def counter_save_collision(self, app_name): - monitoring_msg = "{}_save_collision".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Save user data collision") - c.inc() - - @silence_it - def counter_save_collision_tries_left(self, app_name): - monitoring_msg = "{}_save_collision_tries_left".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Save user data collision all retries left.") - c.inc() - - @silence_it - def counter_exception(self, app_name): - monitoring_msg = "{}_exception".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Exception in run-time.") - c.inc() - - @silence_it - def counter_invalid_message(self, app_name): - monitoring_msg = "{}_invalid_message".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "Incoming message validation error.") - c.inc() - - @silence_it - def counter_behavior_success(self, app_name, request_message_name): - # 'Number of Success replies on messageName' - self._behavior_monitoing_by_status(app_name, "success", request_message_name) - - @silence_it - def counter_behavior_fail(self, app_name, request_message_name): - # 'Number of Fail replies on messageName' - self._behavior_monitoing_by_status(app_name, "fail", request_message_name) - - @silence_it - def counter_behavior_misstate(self, app_name, request_message_name): - # 'Number of Misstate replies on messageName' - self._behavior_monitoing_by_status(app_name, "misstate", request_message_name) - - @silence_it - def counter_behavior_timeout(self, app_name, request_message_name): - # 'Number of Timeout replies on messageName' - self._behavior_monitoing_by_status(app_name, "timeout", request_message_name) - - @silence_it - def counter_behavior_expire(self, app_name, request_message_name): - # 'Number of expire events on messageName' - self._behavior_monitoing_by_status(app_name, "expire", request_message_name) - - def _behavior_monitoing_by_status(self, app_name, status, request_message_name): - monitoring_msg = '{}_callback'.format(app_name) - c = self._get_or_create_counter(monitoring_msg, - "Count of incoming callback events with request_message_name", - ['request_message_name', 'status']) - - c.labels(request_message_name, status).inc() - - @silence_it - def counter_host_has_changed(self, app_name): - monitoring_msg = '{}_host_has_changed'.format(app_name) - c = self._get_or_create_counter(monitoring_msg, - "Count of host has changed events within one message_id") - c.inc() - - @silence_it - def counter_mq_long_waiting(self, app_name): - monitoring_msg = "{}_mq_long_waiting".format(app_name) - c = self._get_or_create_counter(_filter_monitoring_msg(monitoring_msg), "(Now - creation_time) is greater than threshold") - c.inc() - - @silence_it - def sampling_load_time(self, app_name, value): - monitoring_msg = "{}_load_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) - - @silence_it - def sampling_script_time(self, app_name, value): - monitoring_msg = "{}_script_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) - - @silence_it - def sampling_save_time(self, app_name, value): - monitoring_msg = "{}_save_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) - - @silence_it - def sampling_mq_waiting_time(self, app_name, value): - monitoring_msg = "{}_mq_waiting_time".format(app_name) - monitoring.got_histogram_observe(_filter_monitoring_msg(monitoring_msg), value) - - -smart_kit_metrics = Metrics() diff --git a/tests/core_tests/monitoring_test/test_monitoring.py b/tests/core_tests/monitoring_test/test_monitoring.py index fc5f5827..bfef32ee 100644 --- a/tests/core_tests/monitoring_test/test_monitoring.py +++ b/tests/core_tests/monitoring_test/test_monitoring.py @@ -7,7 +7,7 @@ from smart_kit.utils.picklable_mock import PicklableMock -class MonitoringTest1(unittest.TestCase): +class MonitoringTest(unittest.TestCase): def setUp(self): self.logger = PicklableMock() self.logger.exception = PicklableMock() @@ -39,7 +39,7 @@ def test_got_histogram_disabled(self): event_name = "test_histogram" histogram_item = self.monitoring._monitoring_items[self.monitoring.HISTOGRAM] self.assertTrue(histogram_item == dict()) - histogram = self.monitoring.got_histogram(event_name) + histogram = self.monitoring._got_histogram(event_name) self.assertTrue(event_name not in histogram_item) self.assertIsNone(histogram) @@ -48,7 +48,7 @@ def test_got_histogram_enabled(self): event_name = "test_histogram" histogram_item = self.monitoring._monitoring_items[self.monitoring.HISTOGRAM] self.assertTrue(histogram_item == dict()) - self.monitoring.got_histogram(event_name) + self.monitoring._got_histogram(event_name) histogram_item = self.monitoring._monitoring_items[self.monitoring.HISTOGRAM] self.assertTrue(event_name in histogram_item) self.assertEqual(type(histogram_item[event_name]), type(Histogram('histogram_name', 'histogram_name'))) @@ -68,12 +68,44 @@ def test_got_histogram_disabled_by_name(self): self.assertTrue(event_name in counter_item, event_name) def test_monitoring_init(self): + from core.monitoring.monitoring import monitoring + class MyCustomMonitoring(Monitoring): pass - from core.monitoring import init_monitoring - init_monitoring(MyCustomMonitoring) - from core.monitoring import monitoring - self.assertIsInstance(monitoring.monitoring, MyCustomMonitoring) - init_monitoring(Monitoring) - self.assertIsInstance(monitoring.monitoring, Monitoring) + monitoring.set_instance(MyCustomMonitoring) + self.assertEqual(type(monitoring.instance), MyCustomMonitoring) + + monitoring.set_instance(Monitoring) + self.assertEqual(type(monitoring.instance), Monitoring) + + def test_decorator(self): + from core.monitoring.monitoring import monitoring + + class MyCustomMonitoring(Monitoring): + def got_histogram(self, name, description=None): + def decorator(function): + def wrapper(*args, **kwargs): + result = function(*args, **kwargs) + " " + name + return result + + return wrapper + + return decorator + + class SomeClass: + @monitoring.got_histogram("test_histogram") + def some_method(self): + return "test" + + obj = SomeClass() + self.assertEqual(obj.some_method(), "test") + + monitoring.set_instance(MyCustomMonitoring) + self.assertEqual(obj.some_method(), "test test_histogram") + + monitoring.set_instance(Monitoring) + self.assertEqual(obj.some_method(), "test") + + self.assertIn('test_histogram', monitoring._monitoring_items[Monitoring.HISTOGRAM].keys()) +