-
Notifications
You must be signed in to change notification settings - Fork 757
Implement filtering logic for min_severity and trace_based parameters #4765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
549f0f0
6db2394
ff857b1
7f13cff
bf80f96
bdae3e0
63826b2
c580f92
2fb7f0a
4c4b337
c8ecfff
11f8cb5
45822b5
5fb6bc5
8bb4d15
e89621a
4eaecaa
000cad0
c766a11
311b467
709f594
3e8204e
2de7586
499b9ac
1f913eb
372d63d
d173368
6bef931
80f8a78
03ad66a
e54ba6f
4530403
9965bd3
8cedda6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,12 +11,16 @@ | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| # pylint: disable=too-many-lines | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import abc | ||
| import atexit | ||
| import base64 | ||
| import concurrent.futures | ||
| import fnmatch | ||
| import json | ||
| import logging | ||
| import threading | ||
|
|
@@ -61,6 +65,8 @@ | |
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
| LoggerConfigurator = Callable[[InstrumentationScope], "LoggerConfig | None"] | ||
|
|
||
| _DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT = 128 | ||
| _ENV_VALUE_UNSET = "" | ||
|
|
||
|
|
@@ -96,6 +102,81 @@ class LogDeprecatedInitWarning(UserWarning): | |
| warnings.simplefilter("once", LogDeprecatedInitWarning) | ||
|
|
||
|
|
||
| class LoggerConfig: | ||
| def __init__( | ||
| self, | ||
| disabled: bool = False, | ||
| minimum_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED, | ||
| trace_based_sampling: bool = False, | ||
| ): | ||
| """Initialize LoggerConfig with specified parameters. | ||
|
|
||
| Args: | ||
| disabled: A boolean indication of whether the logger is enabled. | ||
| If not explicitly set, defaults to False (i.e. Loggers are enabled by default). | ||
| If True, the logger behaves equivalently to a No-op Logger. | ||
| minimum_severity_level: A SeverityNumber indicating the minimum severity level | ||
| for log records to be processed. If not explicitly set, defaults to UNSPECIFIED (0). | ||
| If a log record's SeverityNumber is specified and is less than the configured | ||
| minimum_severity_level, the log record is dropped by the Logger. | ||
| trace_based_sampling: A boolean indication of whether the logger should only | ||
| process log records associated with sampled traces. If not explicitly set, | ||
| defaults to False. If True, log records associated with unsampled traces | ||
| are dropped by the Logger. | ||
| """ | ||
| self.disabled = disabled | ||
| self.minimum_severity_level = minimum_severity_level | ||
| self.trace_based_sampling = trace_based_sampling | ||
|
|
||
| def __repr__(self): | ||
| return ( | ||
| f"LoggerConfig(disabled={self.disabled}, " | ||
| f"minimum_severity_level={self.minimum_severity_level}, " | ||
| f"trace_based_sampling={self.trace_based_sampling})" | ||
| ) | ||
|
|
||
|
|
||
| def create_logger_configurator_by_name( | ||
| logger_configs: dict[str, LoggerConfig], | ||
| ) -> LoggerConfigurator: | ||
| """Create a LoggerConfigurator that selects configuration based on logger name. | ||
|
|
||
| Args: | ||
| logger_configs: A dictionary mapping logger names to LoggerConfig instances. | ||
| Loggers not found in this mapping will use the default config. | ||
|
|
||
| Returns: | ||
| A LoggerConfigurator function that can be used with LoggerProvider. | ||
| """ | ||
|
|
||
| def configurator(scope: InstrumentationScope) -> LoggerConfig | None: | ||
| return logger_configs.get(scope.name) | ||
|
|
||
| return configurator | ||
|
|
||
|
|
||
| def create_logger_configurator_with_pattern( | ||
| patterns: list[tuple[str, LoggerConfig]], | ||
| ) -> LoggerConfigurator: | ||
| """Create a LoggerConfigurator that matches logger names using patterns. | ||
|
|
||
| Args: | ||
| patterns: A list of (pattern, config) tuples. Patterns are matched in order, | ||
| and the first match is used. Use '*' as a wildcard. | ||
|
|
||
| Returns: | ||
| A LoggerConfigurator function that can be used with LoggerProvider. | ||
| """ | ||
|
|
||
| def configurator(scope: InstrumentationScope) -> LoggerConfig | None: | ||
| for pattern, config in patterns: | ||
| if fnmatch.fnmatch(scope.name, pattern): | ||
| return config | ||
| return None | ||
|
|
||
| return configurator | ||
|
|
||
|
|
||
| class LogLimits: | ||
| """This class is based on a SpanLimits class in the Tracing module. | ||
|
|
||
|
|
@@ -685,7 +766,15 @@ def __init__( | |
| ConcurrentMultiLogRecordProcessor, | ||
| ], | ||
| instrumentation_scope: InstrumentationScope, | ||
| config: LoggerConfig | None = None, | ||
| min_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the LoggerConfigurator supposed to be on the LoggerProvider?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aabmass Yes, |
||
| trace_based_sampling: bool = False, | ||
| ): | ||
| if config is not None: | ||
| self._config = config | ||
| else: | ||
| self._config = LoggerConfig() | ||
|
|
||
| super().__init__( | ||
| instrumentation_scope.name, | ||
| instrumentation_scope.version, | ||
|
|
@@ -695,11 +784,30 @@ def __init__( | |
| self._resource = resource | ||
| self._multi_log_record_processor = multi_log_record_processor | ||
| self._instrumentation_scope = instrumentation_scope | ||
| self._min_severity_level = min_severity_level | ||
| self._trace_based_sampling = trace_based_sampling | ||
|
|
||
| @property | ||
| def resource(self): | ||
| return self._resource | ||
|
|
||
| @property | ||
| def config(self): | ||
| return self._config | ||
|
|
||
| @property | ||
| def instrumentation_scope(self): | ||
| """Get the instrumentation scope for this logger.""" | ||
| return self._instrumentation_scope | ||
|
|
||
| def update_config(self, config: LoggerConfig) -> None: | ||
| """Update the logger's configuration. | ||
|
|
||
| Args: | ||
| config: The new LoggerConfig to use. | ||
| """ | ||
| self._config = config | ||
|
|
||
| @overload | ||
| def emit( | ||
| self, | ||
|
|
@@ -758,7 +866,20 @@ def emit( | |
| record=record, resource=self._resource | ||
| ) | ||
|
|
||
| log_data = LogData(record, self._instrumentation_scope) | ||
| if self._config.disabled: | ||
| return | ||
|
|
||
| if is_less_than_min_severity( | ||
| record, self._config.minimum_severity_level | ||
| ): | ||
| return | ||
|
|
||
| if should_drop_logs_for_unsampled_traces( | ||
| record, self._config.trace_based_sampling | ||
| ): | ||
| return | ||
|
|
||
| log_data = LogData(record, self._instrumentation_scope) | ||
|
|
||
| self._multi_log_record_processor.on_emit(log_data) | ||
|
|
||
|
|
@@ -771,6 +892,9 @@ def __init__( | |
| multi_log_record_processor: SynchronousMultiLogRecordProcessor | ||
| | ConcurrentMultiLogRecordProcessor | ||
| | None = None, | ||
| min_severity_level: SeverityNumber = SeverityNumber.UNSPECIFIED, | ||
| trace_based_sampling: bool = False, | ||
| logger_configurator: LoggerConfigurator | None = None, | ||
| ): | ||
| if resource is None: | ||
| self._resource = Resource.create({}) | ||
|
|
@@ -786,6 +910,23 @@ def __init__( | |
| self._at_exit_handler = atexit.register(self.shutdown) | ||
| self._logger_cache = {} | ||
| self._logger_cache_lock = Lock() | ||
| self._min_severity_level = min_severity_level | ||
| self._trace_based_sampling = trace_based_sampling | ||
|
|
||
| if logger_configurator is not None: | ||
| self._logger_configurator = logger_configurator | ||
| else: | ||
|
|
||
| def default_configurator( | ||
| scope: InstrumentationScope, | ||
| ) -> LoggerConfig: | ||
| return LoggerConfig( | ||
| disabled=self._disabled, | ||
| minimum_severity_level=self._min_severity_level, | ||
| trace_based_sampling=self._trace_based_sampling, | ||
| ) | ||
|
|
||
| self._logger_configurator = default_configurator | ||
|
|
||
| @property | ||
| def resource(self): | ||
|
|
@@ -798,15 +939,24 @@ def _get_logger_no_cache( | |
| schema_url: str | None = None, | ||
| attributes: _ExtendedAttributes | None = None, | ||
| ) -> Logger: | ||
| instrumentation_scope = InstrumentationScope( | ||
| name, | ||
| version, | ||
| schema_url, | ||
| attributes, | ||
| ) | ||
| config = self._logger_configurator(instrumentation_scope) | ||
| if config is None: | ||
| config = LoggerConfig( | ||
| disabled=self._disabled, | ||
| minimum_severity_level=self._min_severity_level, | ||
| trace_based_sampling=self._trace_based_sampling, | ||
| ) | ||
| return Logger( | ||
| self._resource, | ||
| self._multi_log_record_processor, | ||
| InstrumentationScope( | ||
| name, | ||
| version, | ||
| schema_url, | ||
| attributes, | ||
| ), | ||
| instrumentation_scope, | ||
| config=config, | ||
| ) | ||
|
|
||
| def _get_logger_cached( | ||
|
|
@@ -854,6 +1004,22 @@ def add_log_record_processor( | |
| log_record_processor | ||
| ) | ||
|
|
||
| def set_logger_configurator( | ||
| self, configurator: LoggerConfigurator | ||
| ) -> None: | ||
| """Update the logger configurator and apply the new configuration to all existing loggers.""" | ||
| with self._logger_cache_lock: | ||
| self._logger_configurator = configurator | ||
| for logger in self._logger_cache.values(): | ||
| new_config = configurator(logger.instrumentation_scope) | ||
| if new_config is None: | ||
| new_config = LoggerConfig( | ||
| disabled=self._disabled, | ||
| minimum_severity_level=self._min_severity_level, | ||
| trace_based_sampling=self._trace_based_sampling, | ||
| ) | ||
| logger.update_config(new_config) | ||
|
|
||
| def shutdown(self): | ||
| """Shuts down the log processors.""" | ||
| self._multi_log_record_processor.shutdown() | ||
|
|
@@ -933,3 +1099,54 @@ def std_to_otel(levelno: int) -> SeverityNumber: | |
| if levelno > 53: | ||
| return SeverityNumber.FATAL4 | ||
| return _STD_TO_OTEL[levelno] | ||
|
|
||
|
|
||
| def is_less_than_min_severity( | ||
| record: LogRecord, min_severity: SeverityNumber | ||
| ) -> bool: | ||
| """Checks if the log record's severity number is less than the minimum severity level. | ||
|
|
||
| Args: | ||
| record: The log record to be processed. | ||
| min_severity: The minimum severity level. | ||
|
|
||
| Returns: | ||
| True if the log record's severity number is less than the minimum | ||
| severity level, False otherwise. Log records with an unspecified severity (i.e. `0`) | ||
| are not affected by this parameter and therefore bypass minimum severity filtering. | ||
| """ | ||
| if record.severity_number is not None: | ||
| if ( | ||
| min_severity is not None | ||
| and min_severity != SeverityNumber.UNSPECIFIED | ||
| and record.severity_number.value < min_severity.value | ||
| ): | ||
| return True | ||
| return False | ||
|
|
||
|
|
||
| def should_drop_logs_for_unsampled_traces( | ||
| record: LogRecord, trace_based_sampling_flag: bool | ||
| ) -> bool: | ||
| """Determines whether the logger should drop log records associated with unsampled traces. | ||
|
|
||
| If `trace_based_sampling` is `true`, log records associated with unsampled traces are dropped by the `Logger`. | ||
| A log record is considered associated with an unsampled trace if it has a valid `SpanId` and its | ||
| `TraceFlags` indicate that the trace is unsampled. A log record that isn't associated with a trace | ||
| context is not affected by this parameter and therefore bypasses trace-based filtering. | ||
|
|
||
| Args: | ||
| record: The log record to be processed. | ||
| trace_based_sampling_flag: A boolean flag indicating whether trace-based filtering is enabled. If not explicitly set, | ||
| the `trace_based_sampling` parameter is set to `false` | ||
|
|
||
| Returns: | ||
| True if the log record should be dropped due to being associated with an unsampled trace. | ||
| """ | ||
| if trace_based_sampling_flag: | ||
| if record.context is not None: | ||
| span = get_current_span(record.context) | ||
| span_context = span.get_span_context() | ||
| if span_context.is_valid and not span_context.trace_flags.sampled: | ||
| return True | ||
| return False | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit wary supporting concepts related to components that are still in development directly into the sdk, especially since we are trying to stabilize logging sdk. It seems like Go is taking the approach of implementing this internally via logrecordprocessors, perhaps this might be a better way so that we don't have to change the public api? Perhaps bring this up in the PYthon SIG.