diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index cf3e1860b..f8c7b287d 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -85,6 +85,7 @@ from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain +from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController from google.cloud.bigtable.data._cross_sync import CrossSync @@ -969,6 +970,8 @@ def __init__( default_retryable_errors or () ) + self._metrics = BigtableClientSideMetricsController() + try: self._register_instance_future = CrossSync.create_task( self.client._register_instance, @@ -1682,6 +1685,7 @@ async def close(self): """ Called to close the Table instance and release any resources held by it. """ + self._metrics.close() if self._register_instance_future: self._register_instance_future.cancel() await self.client._remove_instance_registration(self.instance_id, self) diff --git a/google/cloud/bigtable/data/_async/metrics_interceptor.py b/google/cloud/bigtable/data/_async/metrics_interceptor.py index 89bc6df5a..0bd401a78 100644 --- a/google/cloud/bigtable/data/_async/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_async/metrics_interceptor.py @@ -13,11 +13,22 @@ # limitations under the License from __future__ import annotations +from typing import Sequence + +import time +from functools import wraps + +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import OperationState +from google.cloud.bigtable.data._metrics.data_model import OperationType +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler + from google.cloud.bigtable.data._cross_sync import CrossSync if CrossSync.is_async: from grpc.aio import UnaryUnaryClientInterceptor from grpc.aio import UnaryStreamClientInterceptor + from grpc.aio import AioRpcError else: from grpc import UnaryUnaryClientInterceptor from grpc import UnaryStreamClientInterceptor @@ -26,30 +37,93 @@ __CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.metrics_interceptor" +def _with_operation_from_metadata(func): + """ + Decorator for interceptor methods to extract the active operation + from metadata and pass it to the decorated function. + """ + + @wraps(func) + def wrapper(self, continuation, client_call_details, request): + operation: "ActiveOperationMetric" | None = ActiveOperationMetric.get_active() + + if operation: + # start a new attempt if not started + if ( + operation.state == OperationState.CREATED + or operation.state == OperationState.BETWEEN_ATTEMPTS + ): + operation.start_attempt() + # wrap continuation in logic to process the operation + return func(self, operation, continuation, client_call_details, request) + else: + # if operation not found, return unwrapped continuation + return continuation(client_call_details, request) + + return wrapper + + +@CrossSync.convert +async def _get_metadata(source) -> dict[str, str | bytes] | None: + """Helper to extract metadata from a call or RpcError""" + try: + metadata: Sequence[tuple[str, str | bytes]] + if CrossSync.is_async: + # grpc.aio returns metadata in Metadata objects + if isinstance(source, AioRpcError): + metadata = list(source.trailing_metadata()) + list( + source.initial_metadata() + ) + else: + metadata = list(await source.trailing_metadata()) + list( + await source.initial_metadata() + ) + else: + # sync grpc returns metadata as a sequence of tuples + metadata = source.trailing_metadata() + source.initial_metadata() + # convert metadata to dict format + return {k: v for (k, v) in metadata} + except Exception: + # ignore errors while fetching metadata + return None + + @CrossSync.convert_class(sync_name="BigtableMetricsInterceptor") class AsyncBigtableMetricsInterceptor( - UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor + UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, MetricsHandler ): """ An async gRPC interceptor to add client metadata and print server metadata. """ @CrossSync.convert - async def intercept_unary_unary(self, continuation, client_call_details, request): + @_with_operation_from_metadata + async def intercept_unary_unary( + self, operation, continuation, client_call_details, request + ): """ Interceptor for unary rpcs: - MutateRow - CheckAndMutateRow - ReadModifyWriteRow """ + metadata = None try: call = await continuation(client_call_details, request) + metadata = await _get_metadata(call) return call except Exception as rpc_error: + metadata = await _get_metadata(rpc_error) raise rpc_error + finally: + if metadata is not None: + operation.add_response_metadata(metadata) @CrossSync.convert - async def intercept_unary_stream(self, continuation, client_call_details, request): + @_with_operation_from_metadata + async def intercept_unary_stream( + self, operation, continuation, client_call_details, request + ): """ Interceptor for streaming rpcs: - ReadRows @@ -58,21 +132,41 @@ async def intercept_unary_stream(self, continuation, client_call_details, reques """ try: return self._streaming_generator_wrapper( - await continuation(client_call_details, request) + operation, await continuation(client_call_details, request) ) except Exception as rpc_error: - # handle errors while intializing stream + metadata = await _get_metadata(rpc_error) + if metadata is not None: + operation.add_response_metadata(metadata) raise rpc_error @staticmethod @CrossSync.convert - async def _streaming_generator_wrapper(call): + async def _streaming_generator_wrapper(operation, call): """ Wrapped generator to be returned by intercept_unary_stream """ + # only track has_first response for READ_ROWS + has_first_response = ( + operation.first_response_latency_ns is not None + or operation.op_type != OperationType.READ_ROWS + ) + encountered_exc = None try: async for response in call: + # record time to first response. Currently only used for READ_ROWs + if not has_first_response: + operation.first_response_latency_ns = ( + time.monotonic_ns() - operation.start_time_ns + ) + has_first_response = True yield response except Exception as e: # handle errors while processing stream - raise e + encountered_exc = e + raise + finally: + if call is not None: + metadata = await _get_metadata(encountered_exc or call) + if metadata is not None: + operation.add_response_metadata(metadata) diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 424a34486..e848ebc6f 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -23,6 +23,7 @@ from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.api_core import exceptions as core_exceptions +from google.api_core.retry import exponential_sleep_generator from google.api_core.retry import RetryFailureReason from google.cloud.bigtable.data.exceptions import RetryExceptionGroup @@ -248,3 +249,61 @@ def _get_retryable_errors( call_codes = table.default_mutate_rows_retryable_errors return [_get_error_type(e) for e in call_codes] + + +class TrackedBackoffGenerator: + """ + Generator class for exponential backoff sleep times. + This implementation builds on top of api_core.retries.exponential_sleep_generator, + adding the ability to retrieve previous values using get_attempt_backoff(idx). + This is used by the Metrics class to track the sleep times used for each attempt. + """ + + def __init__(self, initial=0.01, maximum=60, multiplier=2): + self.history = [] + self.subgenerator = exponential_sleep_generator( + initial=initial, maximum=maximum, multiplier=multiplier + ) + self._next_override: float | None = None + + def __iter__(self): + return self + + def set_next(self, next_value: float): + """ + Set the next backoff value, instead of generating one from subgenerator. + After the value is yielded, it will go back to using self.subgenerator. + + If set_next is called twice before the next() is called, only the latest + value will be used and others discarded + + Args: + next_value: the upcomming value to yield when next() is called + Raises: + ValueError: if next_value is negative + """ + if next_value < 0: + raise ValueError("backoff value cannot be less than 0") + self._next_override = next_value + + def __next__(self) -> float: + if self._next_override is not None: + next_backoff = self._next_override + self._next_override = None + else: + next_backoff = next(self.subgenerator) + self.history.append(next_backoff) + return next_backoff + + def get_attempt_backoff(self, attempt_idx) -> float: + """ + returns the backoff time for a specific attempt index, starting at 0. + + Args: + attempt_idx: the index of the attempt to return backoff for + Raises: + IndexError: if attempt_idx is negative, or not in history + """ + if attempt_idx < 0: + raise IndexError("received negative attempt number") + return self.history[attempt_idx] diff --git a/google/cloud/bigtable/data/_metrics/__init__.py b/google/cloud/bigtable/data/_metrics/__init__.py new file mode 100644 index 000000000..20d36d4c8 --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/__init__.py @@ -0,0 +1,31 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from google.cloud.bigtable.data._metrics.metrics_controller import ( + BigtableClientSideMetricsController, +) + +from google.cloud.bigtable.data._metrics.data_model import OperationType +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric + +__all__ = ( + "BigtableClientSideMetricsController", + "OperationType", + "ActiveOperationMetric", + "ActiveAttemptMetric", + "CompletedOperationMetric", + "CompletedAttemptMetric", +) diff --git a/google/cloud/bigtable/data/_metrics/data_model.py b/google/cloud/bigtable/data/_metrics/data_model.py new file mode 100644 index 000000000..d0d9b5f52 --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/data_model.py @@ -0,0 +1,499 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Callable, ClassVar, List, Tuple, Optional, cast, TYPE_CHECKING + +import time +import re +import logging +import uuid +import contextvars + +from enum import Enum +from functools import lru_cache +from dataclasses import dataclass +from dataclasses import field +from grpc import StatusCode +from grpc import RpcError +from grpc.aio import AioRpcError + +from google.api_core.exceptions import GoogleAPICallError +from google.api_core.retry import RetryFailureReason +import google.cloud.bigtable.data.exceptions as bt_exceptions +from google.cloud.bigtable_v2.types.response_params import ResponseParams +from google.cloud.bigtable.data._helpers import TrackedBackoffGenerator +from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.protobuf.message import DecodeError + +if TYPE_CHECKING: + from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler + + +LOGGER = logging.getLogger(__name__) + +# default values for zone and cluster data, if not captured +DEFAULT_ZONE = "global" +DEFAULT_CLUSTER_ID = "unspecified" + +# keys for parsing metadata blobs +BIGTABLE_METADATA_KEY = "x-goog-ext-425905942-bin" +SERVER_TIMING_METADATA_KEY = "server-timing" +SERVER_TIMING_REGEX = re.compile(r".*gfet4t7;\s*dur=(\d+\.?\d*).*") + +INVALID_STATE_ERROR = "Invalid state for {}: {}" + +ExceptionFactoryType = Callable[ + [List[Exception], RetryFailureReason, Optional[float]], + Tuple[Exception, Optional[Exception]], +] + + +class OperationType(Enum): + """Enum for the type of operation being performed.""" + + READ_ROWS = "ReadRows" + SAMPLE_ROW_KEYS = "SampleRowKeys" + BULK_MUTATE_ROWS = "MutateRows" + MUTATE_ROW = "MutateRow" + CHECK_AND_MUTATE = "CheckAndMutateRow" + READ_MODIFY_WRITE = "ReadModifyWriteRow" + + +class OperationState(Enum): + """Enum for the state of the active operation.""" + + CREATED = 0 + ACTIVE_ATTEMPT = 1 + BETWEEN_ATTEMPTS = 2 + COMPLETED = 3 + + +@dataclass(frozen=True) +class CompletedAttemptMetric: + """ + An immutable dataclass representing the data associated with a + completed rpc attempt. + + Operation-level fields (eg. type, cluster, zone) are stored on the + corresponding CompletedOperationMetric or ActiveOperationMetric object. + """ + + duration_ns: int + end_status: StatusCode + gfe_latency_ns: int | None = None + application_blocking_time_ns: int = 0 + backoff_before_attempt_ns: int = 0 + grpc_throttling_time_ns: int = 0 + + +@dataclass(frozen=True) +class CompletedOperationMetric: + """ + An immutable dataclass representing the data associated with a + completed rpc operation. + + Attempt-level fields (eg. duration, latencies, etc) are stored on the + corresponding CompletedAttemptMetric object. + """ + + op_type: OperationType + uuid: str + duration_ns: int + completed_attempts: list[CompletedAttemptMetric] + final_status: StatusCode + cluster_id: str + zone: str + is_streaming: bool + first_response_latency_ns: int | None = None + flow_throttling_time_ns: int = 0 + + +@dataclass +class ActiveAttemptMetric: + """ + A dataclass representing the data associated with an rpc attempt that is + currently in progress. Fields are mutable and may be optional. + """ + + # keep monotonic timestamps for active attempts + start_time_ns: int = field(default_factory=time.monotonic_ns) + # the time taken by the backend, in nanoseconds. Taken from response header + gfe_latency_ns: int | None = None + # time waiting on user to process the response, in nanoseconds + # currently only relevant for ReadRows + application_blocking_time_ns: int = 0 + # backoff time is added to application_blocking_time_ns + backoff_before_attempt_ns: int = 0 + # time waiting on grpc channel, in nanoseconds + # TODO: capture grpc_throttling_time + grpc_throttling_time_ns: int = 0 + + +@dataclass +class ActiveOperationMetric: + """ + A dataclass representing the data associated with an rpc operation that is + currently in progress. Fields are mutable and may be optional. + """ + + op_type: OperationType + uuid: str = field(default_factory=lambda: str(uuid.uuid4())) + # create a default backoff generator, initialized with standard default backoff values + backoff_generator: TrackedBackoffGenerator = field( + default_factory=lambda: TrackedBackoffGenerator( + initial=0.01, maximum=60, multiplier=2 + ) + ) + # keep monotonic timestamps for active operations + start_time_ns: int = field(default_factory=time.monotonic_ns) + active_attempt: ActiveAttemptMetric | None = None + cluster_id: str | None = None + zone: str | None = None + completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list) + is_streaming: bool = False # only True for read_rows operations + was_completed: bool = False + handlers: list[MetricsHandler] = field(default_factory=list) + # the time it takes to recieve the first response from the server, in nanoseconds + # attached by interceptor + # currently only tracked for ReadRows + first_response_latency_ns: int | None = None + # time waiting on flow control, in nanoseconds + flow_throttling_time_ns: int = 0 + + _active_operation_context: ClassVar[ + contextvars.ContextVar[ActiveOperationMetric] + ] = contextvars.ContextVar("active_operation_context") + + @classmethod + def get_active(cls): + return cls._active_operation_context.get(None) + + @property + def state(self) -> OperationState: + if self.was_completed: + return OperationState.COMPLETED + elif self.active_attempt is None: + if self.completed_attempts: + return OperationState.BETWEEN_ATTEMPTS + else: + return OperationState.CREATED + else: + return OperationState.ACTIVE_ATTEMPT + + def __post_init__(self): + self._active_operation_context.set(self) + + def start(self) -> None: + """ + Optionally called to mark the start of the operation. If not called, + the operation will be started at initialization. + + Assumes operation is in CREATED state. + """ + if self.state != OperationState.CREATED: + return self._handle_error(INVALID_STATE_ERROR.format("start", self.state)) + self.start_time_ns = time.monotonic_ns() + self._active_operation_context.set(self) + + def start_attempt(self) -> ActiveAttemptMetric | None: + """ + Called to initiate a new attempt for the operation. + + Assumes operation is in either CREATED or BETWEEN_ATTEMPTS states + """ + if ( + self.state != OperationState.BETWEEN_ATTEMPTS + and self.state != OperationState.CREATED + ): + return self._handle_error( + INVALID_STATE_ERROR.format("start_attempt", self.state) + ) + self._active_operation_context.set(self) + + try: + # find backoff value before this attempt + prev_attempt_idx = len(self.completed_attempts) - 1 + backoff = self.backoff_generator.get_attempt_backoff(prev_attempt_idx) + # generator will return the backoff time in seconds, so convert to nanoseconds + backoff_ns = int(backoff * 1e9) + except IndexError: + # backoff value not found + backoff_ns = 0 + + self.active_attempt = ActiveAttemptMetric(backoff_before_attempt_ns=backoff_ns) + return self.active_attempt + + def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None: + """ + Attach trailing metadata to the active attempt. + + If not called, default values for the metadata will be used. + + Assumes operation is in ACTIVE_ATTEMPT state. + + Args: + - metadata: the metadata as extracted from the grpc call + """ + if self.state != OperationState.ACTIVE_ATTEMPT: + return self._handle_error( + INVALID_STATE_ERROR.format("add_response_metadata", self.state) + ) + if self.cluster_id is None or self.zone is None: + # BIGTABLE_METADATA_KEY should give a binary-encoded ResponseParams proto + blob = cast(bytes, metadata.get(BIGTABLE_METADATA_KEY)) + if blob: + parse_result = self._parse_response_metadata_blob(blob) + if parse_result is not None: + cluster, zone = parse_result + if cluster: + self.cluster_id = cluster + if zone: + self.zone = zone + else: + self._handle_error( + f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {blob!r}" + ) + # SERVER_TIMING_METADATA_KEY should give a string with the server-latency headers + timing_header = cast(str, metadata.get(SERVER_TIMING_METADATA_KEY)) + if timing_header: + timing_data = SERVER_TIMING_REGEX.match(timing_header) + if timing_data and self.active_attempt: + gfe_latency_ms = float(timing_data.group(1)) + self.active_attempt.gfe_latency_ns = int(gfe_latency_ms * 1e6) + + @staticmethod + @lru_cache(maxsize=32) + def _parse_response_metadata_blob(blob: bytes) -> Tuple[str, str] | None: + """ + Parse the response metadata blob and return a tuple of cluster and zone. + + Function is cached to avoid parsing the same blob multiple times. + + Args: + - blob: the metadata blob as extracted from the grpc call + Returns: + - a tuple of cluster_id and zone, or None if parsing failed + """ + try: + proto = ResponseParams.pb().FromString(blob) + return proto.cluster_id, proto.zone_id + except (DecodeError, TypeError): + # failed to parse metadata + return None + + def end_attempt_with_status(self, status: StatusCode | BaseException) -> None: + """ + Called to mark the end of an attempt for the operation. + + Typically, this is used to mark a retryable error. If a retry will not + be attempted, `end_with_status` or `end_with_success` should be used + to finalize the operation along with the attempt. + + Assumes operation is in ACTIVE_ATTEMPT state. + + Args: + - status: The status of the attempt. + """ + if self.state != OperationState.ACTIVE_ATTEMPT or self.active_attempt is None: + return self._handle_error( + INVALID_STATE_ERROR.format("end_attempt_with_status", self.state) + ) + if isinstance(status, BaseException): + status = self._exc_to_status(status) + complete_attempt = CompletedAttemptMetric( + duration_ns=time.monotonic_ns() - self.active_attempt.start_time_ns, + end_status=status, + gfe_latency_ns=self.active_attempt.gfe_latency_ns, + application_blocking_time_ns=self.active_attempt.application_blocking_time_ns, + backoff_before_attempt_ns=self.active_attempt.backoff_before_attempt_ns, + grpc_throttling_time_ns=self.active_attempt.grpc_throttling_time_ns, + ) + self.completed_attempts.append(complete_attempt) + self.active_attempt = None + for handler in self.handlers: + handler.on_attempt_complete(complete_attempt, self) + + def end_with_status(self, status: StatusCode | BaseException) -> None: + """ + Called to mark the end of the operation. If there is an active attempt, + end_attempt_with_status will be called with the same status. + + Assumes operation is not already in COMPLETED state. + + Causes on_operation_completed to be called for each registered handler. + + Args: + - status: The status of the operation. + """ + if self.state == OperationState.COMPLETED: + return self._handle_error( + INVALID_STATE_ERROR.format("end_with_status", self.state) + ) + final_status = ( + self._exc_to_status(status) if isinstance(status, BaseException) else status + ) + if self.state == OperationState.ACTIVE_ATTEMPT: + self.end_attempt_with_status(final_status) + self.was_completed = True + finalized = CompletedOperationMetric( + op_type=self.op_type, + uuid=self.uuid, + completed_attempts=self.completed_attempts, + duration_ns=time.monotonic_ns() - self.start_time_ns, + final_status=final_status, + cluster_id=self.cluster_id or DEFAULT_CLUSTER_ID, + zone=self.zone or DEFAULT_ZONE, + is_streaming=self.is_streaming, + first_response_latency_ns=self.first_response_latency_ns, + flow_throttling_time_ns=self.flow_throttling_time_ns, + ) + for handler in self.handlers: + handler.on_operation_complete(finalized) + + def end_with_success(self): + """ + Called to mark the end of the operation with a successful status. + + Assumes operation is not already in COMPLETED state. + + Causes on_operation_completed to be called for each registered handler. + """ + return self.end_with_status(StatusCode.OK) + + @staticmethod + def _exc_to_status(exc: BaseException) -> StatusCode: + """ + Extracts the grpc status code from an exception. + + Exception groups and wrappers will be parsed to find the underlying + grpc Exception. + + If the exception is not a grpc exception, will return StatusCode.UNKNOWN. + + Args: + - exc: The exception to extract the status code from. + """ + if isinstance(exc, bt_exceptions._BigtableExceptionGroup): + exc = exc.exceptions[-1] + if hasattr(exc, "grpc_status_code") and exc.grpc_status_code is not None: + return exc.grpc_status_code + if ( + exc.__cause__ + and hasattr(exc.__cause__, "grpc_status_code") + and exc.__cause__.grpc_status_code is not None + ): + return exc.__cause__.grpc_status_code + if isinstance(exc, AioRpcError) or isinstance(exc, RpcError): + return exc.code() + return StatusCode.UNKNOWN + + def track_retryable_error(self, exc: Exception) -> None: + """ + Used as input to api_core.Retry classes, to track when retryable errors are encountered + + Should be passed as on_error callback + """ + try: + # record metadata from failed rpc + if isinstance(exc, GoogleAPICallError) and exc.errors: + rpc_error = exc.errors[-1] + metadata = list(rpc_error.trailing_metadata()) + list( + rpc_error.initial_metadata() + ) + self.add_response_metadata({k: v for k, v in metadata}) + except Exception: + # ignore errors in metadata collection + pass + if isinstance(exc, _MutateRowsIncomplete): + # _MutateRowsIncomplete represents a successful rpc with some failed mutations + # mark the attempt as successful + self.end_attempt_with_status(StatusCode.OK) + else: + self.end_attempt_with_status(exc) + + def track_terminal_error( + self, exception_factory: ExceptionFactoryType + ) -> ExceptionFactoryType: + """ + Used as input to api_core.Retry classes, to track when terminal errors are encountered + + Should be used as a wrapper over an exception_factory callback + """ + + def wrapper( + exc_list: list[Exception], + reason: RetryFailureReason, + timeout_val: float | None, + ) -> tuple[Exception, Exception | None]: + source_exc, cause_exc = exception_factory(exc_list, reason, timeout_val) + try: + # record metadata from failed rpc + if isinstance(source_exc, GoogleAPICallError) and source_exc.errors: + rpc_error = source_exc.errors[-1] + metadata = list(rpc_error.trailing_metadata()) + list( + rpc_error.initial_metadata() + ) + self.add_response_metadata({k: v for k, v in metadata}) + except Exception: + # ignore errors in metadata collection + pass + if ( + reason == RetryFailureReason.TIMEOUT + and self.state == OperationState.ACTIVE_ATTEMPT + and exc_list + ): + # record ending attempt for timeout failures + attempt_exc = exc_list[-1] + self.track_retryable_error(attempt_exc) + self.end_with_status(source_exc) + return source_exc, cause_exc + + return wrapper + + @staticmethod + def _handle_error(message: str) -> None: + """ + log error metric system error messages + + Args: + - message: The message to include in the exception or warning. + """ + full_message = f"Error in Bigtable Metrics: {message}" + LOGGER.warning(full_message) + + def __enter__(self): + """ + Implements the async manager protocol + + Using the operation's context manager provides assurances that the operation + is always closed when complete, with the proper status code automaticallty + detected when an exception is raised. + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Implements the context manager protocol + + The operation is automatically ended on exit, with the status determined + by the exception type and value. + + If operation was already ended manually, do nothing. + """ + if not self.state == OperationState.COMPLETED: + if exc_val is None: + self.end_with_success() + else: + self.end_with_status(exc_val) diff --git a/google/cloud/bigtable/data/_metrics/handlers/_base.py b/google/cloud/bigtable/data/_metrics/handlers/_base.py new file mode 100644 index 000000000..884091fdd --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/handlers/_base.py @@ -0,0 +1,38 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric + + +class MetricsHandler: + """ + Base class for all metrics handlers. Metrics handlers will receive callbacks + when operations and attempts are completed, and can use this information to + update some external metrics system. + """ + + def __init__(self, **kwargs): + pass + + def on_operation_complete(self, op: CompletedOperationMetric) -> None: + pass + + def on_attempt_complete( + self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric + ) -> None: + pass + + def close(self): + pass diff --git a/google/cloud/bigtable/data/_metrics/metrics_controller.py b/google/cloud/bigtable/data/_metrics/metrics_controller.py new file mode 100644 index 000000000..e9815f201 --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/metrics_controller.py @@ -0,0 +1,63 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler +from google.cloud.bigtable.data._metrics.data_model import OperationType + + +class BigtableClientSideMetricsController: + """ + BigtableClientSideMetricsController is responsible for managing the + lifecycle of the metrics system. The Bigtable client library will + use this class to create new operations. Each operation will be + registered with the handlers associated with this controller. + """ + + def __init__( + self, + handlers: list[MetricsHandler] | None = None, + ): + """ + Initializes the metrics controller. + + Args: + - handlers: A list of MetricsHandler objects to subscribe to metrics events. + """ + self.handlers: list[MetricsHandler] = handlers or [] + + def add_handler(self, handler: MetricsHandler) -> None: + """ + Add a new handler to the list of handlers. + + Args: + - handler: A MetricsHandler object to add to the list of subscribed handlers. + """ + self.handlers.append(handler) + + def create_operation( + self, op_type: OperationType, **kwargs + ) -> ActiveOperationMetric: + """ + Creates a new operation and registers it with the subscribed handlers. + """ + return ActiveOperationMetric(op_type, **kwargs, handlers=self.handlers) + + def close(self): + """ + Close all handlers. + """ + for handler in self.handlers: + handler.close() diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 3d718f966..86993cf56 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -72,6 +72,7 @@ from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain +from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController from google.cloud.bigtable.data._cross_sync import CrossSync from typing import Iterable from grpc import insecure_channel @@ -756,6 +757,7 @@ def __init__( self.default_retryable_errors: Sequence[type[Exception]] = ( default_retryable_errors or () ) + self._metrics = BigtableClientSideMetricsController() try: self._register_instance_future = CrossSync._Sync_Impl.create_task( self.client._register_instance, @@ -1412,6 +1414,7 @@ def read_modify_write_row( def close(self): """Called to close the Table instance and release any resources held by it.""" + self._metrics.close() if self._register_instance_future: self._register_instance_future.cancel() self.client._remove_instance_registration(self.instance_id, self) diff --git a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py index 1c71c6b31..dcc17e591 100644 --- a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py @@ -15,45 +15,113 @@ # This file is automatically generated by CrossSync. Do not edit manually. from __future__ import annotations +from typing import Sequence +import time +from functools import wraps +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import OperationState +from google.cloud.bigtable.data._metrics.data_model import OperationType +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler from grpc import UnaryUnaryClientInterceptor from grpc import UnaryStreamClientInterceptor +def _with_operation_from_metadata(func): + """Decorator for interceptor methods to extract the active operation + from metadata and pass it to the decorated function.""" + + @wraps(func) + def wrapper(self, continuation, client_call_details, request): + operation: "ActiveOperationMetric" | None = ActiveOperationMetric.get_active() + if operation: + if ( + operation.state == OperationState.CREATED + or operation.state == OperationState.BETWEEN_ATTEMPTS + ): + operation.start_attempt() + return func(self, operation, continuation, client_call_details, request) + else: + return continuation(client_call_details, request) + + return wrapper + + +def _get_metadata(source) -> dict[str, str | bytes] | None: + """Helper to extract metadata from a call or RpcError""" + try: + metadata: Sequence[tuple[str, str | bytes]] + metadata = source.trailing_metadata() + source.initial_metadata() + return {k: v for (k, v) in metadata} + except Exception: + return None + + class BigtableMetricsInterceptor( - UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor + UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, MetricsHandler ): """ An async gRPC interceptor to add client metadata and print server metadata. """ - def intercept_unary_unary(self, continuation, client_call_details, request): + @_with_operation_from_metadata + def intercept_unary_unary( + self, operation, continuation, client_call_details, request + ): """Interceptor for unary rpcs: - MutateRow - CheckAndMutateRow - ReadModifyWriteRow""" + metadata = None try: call = continuation(client_call_details, request) + metadata = _get_metadata(call) return call except Exception as rpc_error: + metadata = _get_metadata(rpc_error) raise rpc_error + finally: + if metadata is not None: + operation.add_response_metadata(metadata) - def intercept_unary_stream(self, continuation, client_call_details, request): + @_with_operation_from_metadata + def intercept_unary_stream( + self, operation, continuation, client_call_details, request + ): """Interceptor for streaming rpcs: - ReadRows - MutateRows - SampleRowKeys""" try: return self._streaming_generator_wrapper( - continuation(client_call_details, request) + operation, continuation(client_call_details, request) ) except Exception as rpc_error: + metadata = _get_metadata(rpc_error) + if metadata is not None: + operation.add_response_metadata(metadata) raise rpc_error @staticmethod - def _streaming_generator_wrapper(call): + def _streaming_generator_wrapper(operation, call): """Wrapped generator to be returned by intercept_unary_stream""" + has_first_response = ( + operation.first_response_latency_ns is not None + or operation.op_type != OperationType.READ_ROWS + ) + encountered_exc = None try: for response in call: + if not has_first_response: + operation.first_response_latency_ns = ( + time.monotonic_ns() - operation.start_time_ns + ) + has_first_response = True yield response except Exception as e: - raise e + encountered_exc = e + raise + finally: + if call is not None: + metadata = _get_metadata(encountered_exc or call) + if metadata is not None: + operation.add_response_metadata(metadata) diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 9e434d12f..2cae7a08c 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -55,18 +55,26 @@ from google.cloud.bigtable.data._async._swappable_channel import ( AsyncSwappableChannel, ) + from google.cloud.bigtable.data._async.metrics_interceptor import ( + AsyncBigtableMetricsInterceptor, + ) CrossSync.add_mapping("grpc_helpers", grpc_helpers_async) CrossSync.add_mapping("SwappableChannel", AsyncSwappableChannel) + CrossSync.add_mapping("MetricsInterceptor", AsyncBigtableMetricsInterceptor) else: from google.api_core import grpc_helpers from google.cloud.bigtable.data._sync_autogen.client import Table # noqa: F401 from google.cloud.bigtable.data._sync_autogen._swappable_channel import ( SwappableChannel, ) + from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( + BigtableMetricsInterceptor, + ) CrossSync.add_mapping("grpc_helpers", grpc_helpers) CrossSync.add_mapping("SwappableChannel", SwappableChannel) + CrossSync.add_mapping("MetricsInterceptor", BigtableMetricsInterceptor) __CROSS_SYNC_OUTPUT__ = "tests.unit.data._sync_autogen.test_client" @@ -114,6 +122,7 @@ async def test_ctor(self): assert not client._active_instances assert client._channel_refresh_task is not None assert client.transport._credentials == expected_credentials + assert isinstance(client._metrics_interceptor, CrossSync.MetricsInterceptor) await client.close() @CrossSync.pytest @@ -1151,6 +1160,9 @@ def _make_one( @CrossSync.pytest async def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) expected_table_id = "table-id" expected_instance_id = "instance-id" @@ -1192,6 +1204,7 @@ async def test_ctor(self): instance_key = _WarmedInstanceKey(table.instance_name, table.app_profile_id) assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(table)} + assert isinstance(table._metrics, BigtableClientSideMetricsController) assert table.default_operation_timeout == expected_operation_timeout assert table.default_attempt_timeout == expected_attempt_timeout assert ( @@ -1451,6 +1464,20 @@ async def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_ # empty app_profile_id should send empty string assert "app_profile_id=" in routing_str + @CrossSync.pytest + async def test_close(self): + client = self._make_client() + table = self._make_one(client) + with mock.patch.object( + table._metrics, "close", mock.Mock() + ) as metric_close_mock: + with mock.patch.object( + client, "_remove_instance_registration" + ) as remove_mock: + await table.close() + remove_mock.assert_called_once_with(table.instance_id, table) + metric_close_mock.assert_called_once() + @CrossSync.convert_class( "TestAuthorizedView", add_mapping_for_name="TestAuthorizedView" @@ -1481,6 +1508,9 @@ def _make_one( @CrossSync.pytest async def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) expected_table_id = "table-id" expected_instance_id = "instance-id" @@ -1529,6 +1559,7 @@ async def test_ctor(self): instance_key = _WarmedInstanceKey(view.instance_name, view.app_profile_id) assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(view)} + assert isinstance(view._metrics, BigtableClientSideMetricsController) assert view.default_operation_timeout == expected_operation_timeout assert view.default_attempt_timeout == expected_attempt_timeout assert ( diff --git a/tests/unit/data/_async/test_metrics_interceptor.py b/tests/unit/data/_async/test_metrics_interceptor.py index 59fb06c47..1593b8c99 100644 --- a/tests/unit/data/_async/test_metrics_interceptor.py +++ b/tests/unit/data/_async/test_metrics_interceptor.py @@ -14,7 +14,10 @@ import pytest from grpc import RpcError +from grpc import ClientCallDetails +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import OperationState from google.cloud.bigtable.data._cross_sync import CrossSync # try/except added for compatibility with python < 3.8 @@ -67,102 +70,267 @@ def _get_target_class(): def _make_one(self, *args, **kwargs): return self._get_target_class()(*args, **kwargs) + @CrossSync.pytest + async def test_unary_unary_interceptor_op_not_found(self): + """Test that interceptor call continuation if op is not found""" + instance = self._make_one() + continuation = CrossSync.Mock() + details = ClientCallDetails() + details.metadata = [] + request = mock.Mock() + await instance.intercept_unary_unary(continuation, details, request) + continuation.assert_called_once_with(details, request) + @CrossSync.pytest async def test_unary_unary_interceptor_success(self): """Test that interceptor handles successful unary-unary calls""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) continuation = CrossSync.Mock() call = continuation.return_value - details = mock.Mock() + call.trailing_metadata = CrossSync.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() result = await instance.intercept_unary_unary(continuation, details, request) assert result == call continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + op.end_attempt_with_status.assert_not_called() @CrossSync.pytest async def test_unary_unary_interceptor_failure(self): """test a failed RpcError with metadata""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) + exc = RpcError("test") + exc.trailing_metadata = CrossSync.Mock(return_value=[("a", "b")]) + exc.initial_metadata = CrossSync.Mock(return_value=[("c", "d")]) + continuation = CrossSync.Mock(side_effect=exc) + details = ClientCallDetails() + request = mock.Mock() + with pytest.raises(RpcError) as e: + await instance.intercept_unary_unary(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + @CrossSync.pytest + async def test_unary_unary_interceptor_failure_no_metadata(self): + """test with RpcError without without metadata attached""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) exc = RpcError("test") continuation = CrossSync.Mock(side_effect=exc) - details = mock.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() with pytest.raises(RpcError) as e: await instance.intercept_unary_unary(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_not_called() @CrossSync.pytest async def test_unary_unary_interceptor_failure_generic(self): """test generic exception""" - instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) exc = ValueError("test") continuation = CrossSync.Mock(side_effect=exc) - details = mock.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() with pytest.raises(ValueError) as e: await instance.intercept_unary_unary(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_not_called() + + @CrossSync.pytest + async def test_unary_stream_interceptor_op_not_found(self): + """Test that interceptor calls continuation if op is not found""" + instance = self._make_one() + continuation = CrossSync.Mock() + details = ClientCallDetails() + details.metadata = [] + request = mock.Mock() + await instance.intercept_unary_stream(continuation, details, request) + continuation.assert_called_once_with(details, request) @CrossSync.pytest async def test_unary_stream_interceptor_success(self): """Test that interceptor handles successful unary-stream calls""" - instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1, 2])) - details = mock.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() wrapper = await instance.intercept_unary_stream(continuation, details, request) results = [val async for val in wrapper] assert results == [1, 2] continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + op.end_attempt_with_status.assert_not_called() @CrossSync.pytest async def test_unary_stream_interceptor_failure_mid_stream(self): """Test that interceptor handles failures mid-stream""" + from grpc.aio import AioRpcError, Metadata + instance = self._make_one() - exc = ValueError("test") + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) + exc = AioRpcError(0, Metadata(), Metadata(("a", "b"), ("c", "d"))) continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1], exc=exc)) - details = mock.Mock() + details = ClientCallDetails() request = mock.Mock() wrapper = await instance.intercept_unary_stream(continuation, details, request) - with pytest.raises(ValueError) as e: + with pytest.raises(AioRpcError) as e: [val async for val in wrapper] assert e.value == exc continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) @CrossSync.pytest async def test_unary_stream_interceptor_failure_start_stream(self): """Test that interceptor handles failures at start of stream with RpcError with metadata""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) + exc = RpcError("test") + exc.trailing_metadata = CrossSync.Mock(return_value=[("a", "b")]) + exc.initial_metadata = CrossSync.Mock(return_value=[("c", "d")]) + + continuation = CrossSync.Mock() + continuation.side_effect = exc + details = ClientCallDetails() + request = mock.Mock() + with pytest.raises(RpcError) as e: + await instance.intercept_unary_stream(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + @CrossSync.pytest + async def test_unary_stream_interceptor_failure_start_stream_no_metadata(self): + """Test that interceptor handles failures at start of stream with RpcError with no metadata""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) exc = RpcError("test") continuation = CrossSync.Mock() continuation.side_effect = exc - details = mock.Mock() + details = ClientCallDetails() request = mock.Mock() with pytest.raises(RpcError) as e: await instance.intercept_unary_stream(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_not_called() @CrossSync.pytest async def test_unary_stream_interceptor_failure_start_stream_generic(self): """Test that interceptor handles failures at start of stream with generic exception""" - instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) exc = ValueError("test") continuation = CrossSync.Mock() continuation.side_effect = exc - details = mock.Mock() + details = ClientCallDetails() request = mock.Mock() with pytest.raises(ValueError) as e: await instance.intercept_unary_stream(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_not_called() + + @CrossSync.pytest + @pytest.mark.parametrize( + "initial_state", [OperationState.CREATED, OperationState.BETWEEN_ATTEMPTS] + ) + async def test_unary_unary_interceptor_start_operation(self, initial_state): + """if called with a newly created operation, it should be started""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = initial_state + ActiveOperationMetric._active_operation_context.set(op) + continuation = CrossSync.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync.Mock(return_value=[]) + call.initial_metadata = CrossSync.Mock(return_value=[]) + details = ClientCallDetails() + request = mock.Mock() + await instance.intercept_unary_unary(continuation, details, request) + op.start_attempt.assert_called_once() + + @CrossSync.pytest + @pytest.mark.parametrize( + "initial_state", [OperationState.CREATED, OperationState.BETWEEN_ATTEMPTS] + ) + async def test_unary_stream_interceptor_start_operation(self, initial_state): + """if called with a newly created operation, it should be started""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = initial_state + ActiveOperationMetric._active_operation_context.set(op) + + continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1, 2])) + call = continuation.return_value + call.trailing_metadata = CrossSync.Mock(return_value=[]) + call.initial_metadata = CrossSync.Mock(return_value=[]) + details = ClientCallDetails() + request = mock.Mock() + await instance.intercept_unary_stream(continuation, details, request) + op.start_attempt.assert_called_once() diff --git a/tests/unit/data/_metrics/__init__.py b/tests/unit/data/_metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/data/_metrics/test_data_model.py b/tests/unit/data/_metrics/test_data_model.py new file mode 100644 index 000000000..42aa96093 --- /dev/null +++ b/tests/unit/data/_metrics/test_data_model.py @@ -0,0 +1,697 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import pytest +import mock + +from google.cloud.bigtable.data._metrics.data_model import OperationState as State +from google.cloud.bigtable_v2.types import ResponseParams + + +class TestActiveOperationMetric: + def _make_one(self, *args, **kwargs): + from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric + + return ActiveOperationMetric(*args, **kwargs) + + def test_ctor_defaults(self): + """ + create an instance with default values + """ + mock_type = mock.Mock() + metric = self._make_one(mock_type) + assert metric.op_type == mock_type + assert abs(metric.start_time_ns - time.monotonic_ns()) < 1e6 # 1ms buffer + assert metric.active_attempt is None + assert metric.cluster_id is None + assert metric.zone is None + assert len(metric.completed_attempts) == 0 + assert metric.was_completed is False + assert len(metric.handlers) == 0 + assert metric.is_streaming is False + assert metric.flow_throttling_time_ns == 0 + + def test_ctor_explicit(self): + """ + test with explicit arguments + """ + expected_type = mock.Mock() + expected_start_time_ns = 7 + expected_active_attempt = mock.Mock() + expected_cluster_id = "cluster" + expected_zone = "zone" + expected_completed_attempts = [mock.Mock()] + expected_was_completed = True + expected_handlers = [mock.Mock()] + expected_is_streaming = True + expected_flow_throttling = 12 + metric = self._make_one( + op_type=expected_type, + start_time_ns=expected_start_time_ns, + active_attempt=expected_active_attempt, + cluster_id=expected_cluster_id, + zone=expected_zone, + completed_attempts=expected_completed_attempts, + was_completed=expected_was_completed, + handlers=expected_handlers, + is_streaming=expected_is_streaming, + flow_throttling_time_ns=expected_flow_throttling, + ) + assert metric.op_type == expected_type + assert metric.start_time_ns == expected_start_time_ns + assert metric.active_attempt == expected_active_attempt + assert metric.cluster_id == expected_cluster_id + assert metric.zone == expected_zone + assert metric.completed_attempts == expected_completed_attempts + assert metric.was_completed == expected_was_completed + assert metric.handlers == expected_handlers + assert metric.is_streaming == expected_is_streaming + assert metric.flow_throttling_time_ns == expected_flow_throttling + + def test_state_machine_w_methods(self): + """ + Exercise the state machine by calling methods to move between states + """ + metric = self._make_one(mock.Mock()) + assert metric.state == State.CREATED + metric.start() + assert metric.state == State.CREATED + metric.start_attempt() + assert metric.state == State.ACTIVE_ATTEMPT + metric.end_attempt_with_status(Exception()) + assert metric.state == State.BETWEEN_ATTEMPTS + metric.start_attempt() + assert metric.state == State.ACTIVE_ATTEMPT + metric.end_with_success() + assert metric.state == State.COMPLETED + + def test_state_machine_w_state(self): + """ + Exercise state machine by directly manupulating state variables + + relevant variables are: active_attempt, completed_attempts, was_completed + """ + metric = self._make_one(mock.Mock()) + for was_completed_value in [False, True]: + metric.was_completed = was_completed_value + for active_operation_value in [None, mock.Mock()]: + metric.active_attempt = active_operation_value + for completed_attempts_value in [[], [mock.Mock()]]: + metric.completed_attempts = completed_attempts_value + if was_completed_value: + assert metric.state == State.COMPLETED + elif active_operation_value is not None: + assert metric.state == State.ACTIVE_ATTEMPT + elif completed_attempts_value: + assert metric.state == State.BETWEEN_ATTEMPTS + else: + assert metric.state == State.CREATED + + @pytest.mark.parametrize( + "method,args,valid_states,error_method_name", + [ + ("start", (), (State.CREATED,), None), + ("start_attempt", (), (State.CREATED, State.BETWEEN_ATTEMPTS), None), + ("add_response_metadata", ({},), (State.ACTIVE_ATTEMPT,), None), + ("end_attempt_with_status", (mock.Mock(),), (State.ACTIVE_ATTEMPT,), None), + ( + "end_with_status", + (mock.Mock(),), + ( + State.CREATED, + State.ACTIVE_ATTEMPT, + State.BETWEEN_ATTEMPTS, + ), + None, + ), + ( + "end_with_success", + (), + ( + State.CREATED, + State.ACTIVE_ATTEMPT, + State.BETWEEN_ATTEMPTS, + ), + "end_with_status", + ), + ], + ids=lambda x: x if isinstance(x, str) else "", + ) + def test_error_invalid_states(self, method, args, valid_states, error_method_name): + """ + each method only works for certain states. Make sure _handle_error is called for invalid states + """ + cls = type(self._make_one(mock.Mock())) + invalid_states = set(State) - set(valid_states) + error_method_name = error_method_name or method + for state in invalid_states: + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + mock_handle_error.return_value = None + metric = self._make_one(mock.Mock()) + if state == State.ACTIVE_ATTEMPT: + metric.active_attempt = mock.Mock() + elif state == State.BETWEEN_ATTEMPTS: + metric.completed_attempts.append(mock.Mock()) + elif state == State.COMPLETED: + metric.was_completed = True + return_obj = getattr(metric, method)(*args) + assert return_obj is None + assert mock_handle_error.call_count == 1 + assert ( + mock_handle_error.call_args[0][0] + == f"Invalid state for {error_method_name}: {state}" + ) + + def test_start(self): + """ + calling start op operation should reset start_time + """ + orig_time = 0 + metric = self._make_one(mock.Mock(), start_time_ns=orig_time) + assert abs(metric.start_time_ns - time.monotonic_ns()) > 1e6 # 1ms buffer + metric.start() + assert metric.start_time_ns != orig_time + assert abs(metric.start_time_ns - time.monotonic_ns()) < 1e6 # 1ms buffer + # should remain in CREATED state after completing + assert metric.state == State.CREATED + + def test_start_attempt(self): + """ + calling start_attempt should create a new emptu atempt metric + """ + from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric + + metric = self._make_one(mock.Mock()) + assert metric.active_attempt is None + metric.start_attempt() + assert isinstance(metric.active_attempt, ActiveAttemptMetric) + # make sure it was initialized with the correct values + assert ( + abs(time.monotonic_ns() - metric.active_attempt.start_time_ns) < 1e6 + ) # 1ms buffer + assert metric.active_attempt.gfe_latency_ns is None + assert metric.active_attempt.grpc_throttling_time_ns == 0 + # should be in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + + def test_start_attempt_with_backoff_generator(self): + """ + If operation has a backoff generator, it should be used to attach backoff + times to attempts + """ + from google.cloud.bigtable.data._helpers import TrackedBackoffGenerator + + generator = TrackedBackoffGenerator() + # pre-seed generator with exepcted values + generator.history = list(range(10)) + metric = self._make_one(mock.Mock(), backoff_generator=generator) + metric.start_attempt() + assert len(metric.completed_attempts) == 0 + # first attempt should always be 0 + assert metric.active_attempt.backoff_before_attempt_ns == 0 + # later attempts should have their attempt number as backoff time + for i in range(10): + metric.end_attempt_with_status(mock.Mock()) + assert len(metric.completed_attempts) == i + 1 + metric.start_attempt() + # expect the backoff to be converted froms seconds to ns + assert metric.active_attempt.backoff_before_attempt_ns == (i * 1e9) + + @pytest.mark.parametrize( + "start_cluster,start_zone,metadata_proto,end_cluster,end_zone", + [ + (None, None, None, None, None), + ("orig_cluster", "orig_zone", None, "orig_cluster", "orig_zone"), + (None, None, ResponseParams(), None, None), + ( + "orig_cluster", + "orig_zone", + ResponseParams(), + "orig_cluster", + "orig_zone", + ), + ( + None, + None, + ResponseParams(cluster_id="test-cluster", zone_id="us-central1-b"), + "test-cluster", + "us-central1-b", + ), + ( + None, + "filled", + ResponseParams(cluster_id="cluster", zone_id="zone"), + "cluster", + "zone", + ), + (None, "filled", ResponseParams(cluster_id="cluster"), "cluster", "filled"), + (None, "filled", ResponseParams(zone_id="zone"), None, "zone"), + ( + "filled", + None, + ResponseParams(cluster_id="cluster", zone_id="zone"), + "cluster", + "zone", + ), + ("filled", None, ResponseParams(cluster_id="cluster"), "cluster", None), + ("filled", None, ResponseParams(zone_id="zone"), "filled", "zone"), + ], + ) + def test_add_response_metadata_cbt_header( + self, start_cluster, start_zone, metadata_proto, end_cluster, end_zone + ): + """ + calling add_response_metadata should update fields based on grpc response metadata + The x-goog-ext-425905942-bin field contains cluster and zone info + """ + import grpc + + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + metric = self._make_one( + mock.Mock(), cluster_id=start_cluster, zone=start_zone + ) + metric.active_attempt = mock.Mock() + metric.active_attempt.gfe_latency_ns = None + metadata = grpc.aio.Metadata() + if metadata_proto is not None: + metadata["x-goog-ext-425905942-bin"] = ResponseParams.serialize( + metadata_proto + ) + metric.add_response_metadata(metadata) + assert metric.cluster_id == end_cluster + assert metric.zone == end_zone + # should remain in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + # no errors encountered + assert mock_handle_error.call_count == 0 + # gfe latency should not be touched + assert metric.active_attempt.gfe_latency_ns is None + + @pytest.mark.parametrize( + "metadata_field", + [ + b"bad-input", + "cluster zone", # expect bytes + ], + ) + def test_add_response_metadata_cbt_header_w_error(self, metadata_field): + """ + If the x-goog-ext-425905942-bin field is present, but not structured properly, + _handle_error should be called + + Extra fields should not result in parsingerror + """ + import grpc + + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + metric = self._make_one(mock.Mock()) + metric.cluster_id = None + metric.zone = None + metric.active_attempt = mock.Mock() + metadata = grpc.aio.Metadata() + metadata["x-goog-ext-425905942-bin"] = metadata_field + metric.add_response_metadata(metadata) + # should remain in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + # no errors encountered + assert mock_handle_error.call_count == 1 + assert ( + "Failed to decode x-goog-ext-425905942-bin metadata:" + in mock_handle_error.call_args[0][0] + ) + assert str(metadata_field) in mock_handle_error.call_args[0][0] + + @pytest.mark.parametrize( + "metadata_field,expected_latency_ns", + [ + (None, None), + ("gfet4t7; dur=1000", 1000e6), + ("gfet4t7; dur=1000.0", 1000e6), + ("gfet4t7; dur=1000.1", 1000.1e6), + ("gcp; dur=15, gfet4t7; dur=300", 300e6), + ("gfet4t7;dur=350,gcp;dur=12", 350e6), + ("ignore_megfet4t7;dur=90ignore_me", 90e6), + ("gfet4t7;dur=2000", 2000e6), + ("gfet4t7; dur=0.001", 1000), + ("gfet4t7; dur=0.000001", 1), + ("gfet4t7; dur=0.0000001", 0), # below recording resolution + ("gfet4t7; dur=0", 0), + ("gfet4t7; dur=empty", None), + ("gfet4t7;", None), + ("", None), + ], + ) + def test_add_response_metadata_server_timing_header( + self, metadata_field, expected_latency_ns + ): + """ + calling add_response_metadata should update fields based on grpc response metadata + The server-timing field contains gfle latency info + """ + import grpc + + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + metric = self._make_one(mock.Mock()) + metric.active_attempt = mock.Mock() + metric.active_attempt.gfe_latency_ns = None + metadata = grpc.aio.Metadata() + if metadata_field: + metadata["server-timing"] = metadata_field + metric.add_response_metadata(metadata) + if metric.active_attempt.gfe_latency_ns is None: + assert expected_latency_ns is None + else: + assert metric.active_attempt.gfe_latency_ns == int(expected_latency_ns) + # should remain in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + # no errors encountered + assert mock_handle_error.call_count == 0 + # cluster and zone should not be touched + assert metric.cluster_id is None + assert metric.zone is None + + def test_end_attempt_with_status(self): + """ + ending the attempt should: + - add one to completed_attempts + - reset active_attempt to None + - update state + - notify handlers + """ + expected_start_time = 1 + expected_status = object() + expected_gfe_latency_ns = 5 + expected_app_blocking = 12 + expected_backoff = 2 + expected_grpc_throttle = 3 + handlers = [mock.Mock(), mock.Mock()] + + metric = self._make_one(mock.Mock(), handlers=handlers) + assert metric.active_attempt is None + assert len(metric.completed_attempts) == 0 + metric.start_attempt() + metric.active_attempt.start_time_ns = expected_start_time + metric.active_attempt.gfe_latency_ns = expected_gfe_latency_ns + metric.active_attempt.application_blocking_time_ns = expected_app_blocking + metric.active_attempt.backoff_before_attempt_ns = expected_backoff + metric.active_attempt.grpc_throttling_time_ns = expected_grpc_throttle + metric.end_attempt_with_status(expected_status) + assert len(metric.completed_attempts) == 1 + got_attempt = metric.completed_attempts[0] + expected_duration = time.monotonic_ns() - expected_start_time + assert abs(got_attempt.duration_ns - expected_duration) < 10e6 # within 10ms + assert got_attempt.grpc_throttling_time_ns == expected_grpc_throttle + assert got_attempt.end_status == expected_status + assert got_attempt.gfe_latency_ns == expected_gfe_latency_ns + assert got_attempt.application_blocking_time_ns == expected_app_blocking + assert got_attempt.backoff_before_attempt_ns == expected_backoff + # state should be changed to BETWEEN_ATTEMPTS + assert metric.state == State.BETWEEN_ATTEMPTS + # check handlers + for h in handlers: + assert h.on_attempt_complete.call_count == 1 + assert h.on_attempt_complete.call_args[0][0] == got_attempt + assert h.on_attempt_complete.call_args[0][1] == metric + + def test_end_attempt_with_status_w_exception(self): + """ + exception inputs should be converted to grpc status objects + """ + input_status = ValueError("test") + expected_status = object() + + metric = self._make_one(mock.Mock()) + metric.start_attempt() + with mock.patch.object( + metric, "_exc_to_status", return_value=expected_status + ) as mock_exc_to_status: + metric.end_attempt_with_status(input_status) + assert mock_exc_to_status.call_count == 1 + assert mock_exc_to_status.call_args[0][0] == input_status + assert metric.completed_attempts[0].end_status == expected_status + + def test_end_with_status(self): + """ + ending the operation should: + - end active attempt + - mark operation as completed + - update handlers + """ + from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric + + expected_attempt_start_time = 0 + expected_attempt_gfe_latency_ns = 5 + expected_flow_time = 16 + + expected_first_response_latency_ns = 9 + expected_status = object() + expected_type = object() + expected_start_time = 1 + expected_cluster = object() + expected_zone = object() + is_streaming = object() + + handlers = [mock.Mock(), mock.Mock()] + metric = self._make_one( + expected_type, handlers=handlers, start_time_ns=expected_start_time + ) + metric.cluster_id = expected_cluster + metric.zone = expected_zone + metric.is_streaming = is_streaming + metric.flow_throttling_time_ns = expected_flow_time + metric.first_response_latency_ns = expected_first_response_latency_ns + attempt = ActiveAttemptMetric( + start_time_ns=expected_attempt_start_time, + gfe_latency_ns=expected_attempt_gfe_latency_ns, + ) + metric.active_attempt = attempt + metric.end_with_status(expected_status) + # test that ActiveOperation was updated to terminal state + assert metric.state == State.COMPLETED + assert metric.was_completed is True + assert metric.active_attempt is None + assert len(metric.completed_attempts) == 1 + # check that finalized operation was passed to handlers + for h in handlers: + assert h.on_operation_complete.call_count == 1 + assert len(h.on_operation_complete.call_args[0]) == 1 + called_with = h.on_operation_complete.call_args[0][0] + assert called_with.op_type == expected_type + expected_duration = time.monotonic_ns() - expected_start_time + assert ( + abs(called_with.duration_ns - expected_duration) < 10e6 + ) # within 10ms + assert called_with.final_status == expected_status + assert called_with.cluster_id == expected_cluster + assert called_with.zone == expected_zone + assert called_with.is_streaming == is_streaming + assert called_with.flow_throttling_time_ns == expected_flow_time + assert ( + called_with.first_response_latency_ns + == expected_first_response_latency_ns + ) + # check the attempt + assert len(called_with.completed_attempts) == 1 + final_attempt = called_with.completed_attempts[0] + assert final_attempt.gfe_latency_ns == expected_attempt_gfe_latency_ns + assert final_attempt.end_status == expected_status + expected_duration = time.monotonic_ns() - expected_attempt_start_time + assert ( + abs(final_attempt.duration_ns - expected_duration) < 10e6 + ) # within 10ms + + def test_end_with_status_w_exception(self): + """ + exception inputs should be converted to grpc status objects + """ + input_status = ValueError("test") + expected_status = object() + handlers = [mock.Mock()] + + metric = self._make_one(mock.Mock(), handlers=handlers) + metric.start_attempt() + with mock.patch.object( + metric, "_exc_to_status", return_value=expected_status + ) as mock_exc_to_status: + metric.end_with_status(input_status) + assert mock_exc_to_status.call_count == 1 + assert mock_exc_to_status.call_args[0][0] == input_status + assert metric.completed_attempts[0].end_status == expected_status + final_op = handlers[0].on_operation_complete.call_args[0][0] + assert final_op.final_status == expected_status + + def test_end_with_status_with_default_cluster_zone(self): + """ + ending the operation should use default cluster and zone if not set + """ + from google.cloud.bigtable.data._metrics.data_model import ( + DEFAULT_CLUSTER_ID, + DEFAULT_ZONE, + ) + + handlers = [mock.Mock()] + metric = self._make_one(mock.Mock(), handlers=handlers) + assert metric.cluster_id is None + assert metric.zone is None + metric.end_with_status(mock.Mock()) + assert metric.state == State.COMPLETED + # check that finalized operation was passed to handlers + for h in handlers: + assert h.on_operation_complete.call_count == 1 + called_with = h.on_operation_complete.call_args[0][0] + assert called_with.cluster_id == DEFAULT_CLUSTER_ID + assert called_with.zone == DEFAULT_ZONE + + def test_end_with_success(self): + """ + end with success should be a pass-through helper for end_with_status + """ + from grpc import StatusCode + + inner_result = object() + + metric = self._make_one(mock.Mock()) + with mock.patch.object(metric, "end_with_status") as mock_end_with_status: + mock_end_with_status.return_value = inner_result + got_result = metric.end_with_success() + assert mock_end_with_status.call_count == 1 + assert mock_end_with_status.call_args[0][0] == StatusCode.OK + assert got_result is inner_result + + def test_end_on_empty_operation(self): + """ + Should be able to end an operation without any attempts + """ + from grpc import StatusCode + + handlers = [mock.Mock()] + metric = self._make_one(mock.Mock(), handlers=handlers) + metric.end_with_success() + assert metric.state == State.COMPLETED + assert metric.was_completed is True + final_op = handlers[0].on_operation_complete.call_args[0][0] + assert final_op.final_status == StatusCode.OK + assert final_op.completed_attempts == [] + + def test__exc_to_status(self): + """ + Should return grpc_status_code if grpc error, otherwise UNKNOWN + + If BigtableExceptionGroup, use the most recent exception in the group + """ + from grpc import StatusCode + from google.api_core import exceptions as core_exc + from google.cloud.bigtable.data import exceptions as bt_exc + + cls = type(self._make_one(object())) + # unknown for non-grpc errors + assert cls._exc_to_status(ValueError()) == StatusCode.UNKNOWN + assert cls._exc_to_status(RuntimeError()) == StatusCode.UNKNOWN + # grpc status code for grpc errors + assert ( + cls._exc_to_status(core_exc.InvalidArgument("msg")) + == StatusCode.INVALID_ARGUMENT + ) + assert cls._exc_to_status(core_exc.NotFound("msg")) == StatusCode.NOT_FOUND + assert ( + cls._exc_to_status(core_exc.AlreadyExists("msg")) + == StatusCode.ALREADY_EXISTS + ) + assert ( + cls._exc_to_status(core_exc.PermissionDenied("msg")) + == StatusCode.PERMISSION_DENIED + ) + cause_exc = core_exc.AlreadyExists("msg") + w_cause = core_exc.DeadlineExceeded("msg") + w_cause.__cause__ = cause_exc + assert cls._exc_to_status(w_cause) == StatusCode.DEADLINE_EXCEEDED + # use cause if available + w_cause = ValueError("msg") + w_cause.__cause__ = cause_exc + cause_exc.grpc_status_code = object() + custom_excs = [ + bt_exc.FailedMutationEntryError(1, mock.Mock(), cause=cause_exc), + bt_exc.FailedQueryShardError(1, {}, cause=cause_exc), + w_cause, + ] + for exc in custom_excs: + assert cls._exc_to_status(exc) == cause_exc.grpc_status_code, exc + # extract most recent exception for bigtable exception groups + exc_groups = [ + bt_exc._BigtableExceptionGroup("", [ValueError(), cause_exc]), + bt_exc.RetryExceptionGroup([RuntimeError(), cause_exc]), + bt_exc.ShardedReadRowsExceptionGroup( + [bt_exc.FailedQueryShardError(1, {}, cause=cause_exc)], [], 2 + ), + bt_exc.MutationsExceptionGroup( + [bt_exc.FailedMutationEntryError(1, mock.Mock(), cause=cause_exc)], 2 + ), + ] + for exc in exc_groups: + assert cls._exc_to_status(exc) == cause_exc.grpc_status_code, exc + + def test__handle_error(self): + """ + handle_error should write log + """ + input_message = "test message" + expected_message = f"Error in Bigtable Metrics: {input_message}" + with mock.patch( + "google.cloud.bigtable.data._metrics.data_model.LOGGER" + ) as logger_mock: + type(self._make_one(object()))._handle_error(input_message) + assert logger_mock.warning.call_count == 1 + assert logger_mock.warning.call_args[0][0] == expected_message + assert len(logger_mock.warning.call_args[0]) == 1 + + @pytest.mark.asyncio + async def test_context_manager(self): + """ + Should implement context manager protocol + """ + metric = self._make_one(object()) + with mock.patch.object(metric, "end_with_success") as end_with_success_mock: + end_with_success_mock.side_effect = lambda: metric.end_with_status(object()) + with metric as context: + assert context == metric + # inside context manager, still active + assert end_with_success_mock.call_count == 0 + assert metric.state == State.CREATED + # outside context manager, should be ended + assert end_with_success_mock.call_count == 1 + assert metric.state == State.COMPLETED + + @pytest.mark.asyncio + async def test_context_manager_exception(self): + """ + Exception within context manager causes end_with_status to be called with error + """ + expected_exc = ValueError("expected") + metric = self._make_one(object()) + with mock.patch.object(metric, "end_with_status") as end_with_status_mock: + try: + with metric: + # inside context manager, still active + assert end_with_status_mock.call_count == 0 + assert metric.state == State.CREATED + raise expected_exc + except ValueError as e: + assert e == expected_exc + # outside context manager, should be ended + assert end_with_status_mock.call_count == 1 + assert end_with_status_mock.call_args[0][0] == expected_exc diff --git a/tests/unit/data/_metrics/test_metrics_controller.py b/tests/unit/data/_metrics/test_metrics_controller.py new file mode 100644 index 000000000..2f5eff700 --- /dev/null +++ b/tests/unit/data/_metrics/test_metrics_controller.py @@ -0,0 +1,96 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + + +class TestBigtableClientSideMetricsController: + def _make_one(self, *args, **kwargs): + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) + + return BigtableClientSideMetricsController(*args, **kwargs) + + def test_ctor_defaults(self): + """ + should create instance with GCP Exporter handler by default + """ + instance = self._make_one() + assert len(instance.handlers) == 0 + + def ctor_custom_handlers(self): + """ + if handlers are passed to init, use those instead + """ + custom_handler = object() + custom_interceptor = object() + controller = self._make_one(custom_interceptor, handlers=[custom_handler]) + assert controller.interceptor == custom_interceptor + assert len(controller.handlers) == 1 + assert controller.handlers[0] is custom_handler + + def test_add_handler(self): + """ + New handlers should be added to list + """ + controller = self._make_one(handlers=[object()]) + initial_handler_count = len(controller.handlers) + new_handler = object() + controller.add_handler(new_handler) + assert len(controller.handlers) == initial_handler_count + 1 + assert controller.handlers[-1] is new_handler + + def test_create_operation_mock(self): + """ + All args should be passed through, as well as the handlers + """ + from google.cloud.bigtable.data._metrics import ActiveOperationMetric + + controller = self._make_one(handlers=[object()]) + arg = object() + kwargs = {"a": 1, "b": 2} + with mock.patch( + "google.cloud.bigtable.data._metrics.ActiveOperationMetric.__init__" + ) as mock_op: + mock_op.return_value = None + op = controller.create_operation(arg, **kwargs) + assert isinstance(op, ActiveOperationMetric) + assert mock_op.call_count == 1 + mock_op.assert_called_with(arg, **kwargs, handlers=controller.handlers) + + def test_create_operation(self): + from google.cloud.bigtable.data._metrics import ActiveOperationMetric + + handler = object() + expected_type = object() + expected_is_streaming = True + expected_zone = object() + controller = self._make_one(handlers=[handler]) + op = controller.create_operation( + expected_type, is_streaming=expected_is_streaming, zone=expected_zone + ) + assert isinstance(op, ActiveOperationMetric) + assert op.op_type is expected_type + assert op.is_streaming is expected_is_streaming + assert op.zone is expected_zone + assert len(op.handlers) == 1 + assert op.handlers[0] is handler + + def test_close(self): + handlers = [mock.Mock() for _ in range(3)] + controller = self._make_one(handlers=handlers) + controller.close() + for handler in handlers: + handler.close.assert_called_once() diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index 506ad7e94..42f5388ee 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -47,9 +47,13 @@ ) from google.api_core import grpc_helpers from google.cloud.bigtable.data._sync_autogen._swappable_channel import SwappableChannel +from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( + BigtableMetricsInterceptor, +) CrossSync._Sync_Impl.add_mapping("grpc_helpers", grpc_helpers) CrossSync._Sync_Impl.add_mapping("SwappableChannel", SwappableChannel) +CrossSync._Sync_Impl.add_mapping("MetricsInterceptor", BigtableMetricsInterceptor) @CrossSync._Sync_Impl.add_mapping_decorator("TestBigtableDataClient") @@ -85,6 +89,9 @@ def test_ctor(self): assert not client._active_instances assert client._channel_refresh_task is not None assert client.transport._credentials == expected_credentials + assert isinstance( + client._metrics_interceptor, CrossSync._Sync_Impl.MetricsInterceptor + ) client.close() def test_ctor_super_inits(self): @@ -931,6 +938,9 @@ def _make_one( def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) expected_table_id = "table-id" expected_instance_id = "instance-id" @@ -971,6 +981,7 @@ def test_ctor(self): instance_key = _WarmedInstanceKey(table.instance_name, table.app_profile_id) assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(table)} + assert isinstance(table._metrics, BigtableClientSideMetricsController) assert table.default_operation_timeout == expected_operation_timeout assert table.default_attempt_timeout == expected_attempt_timeout assert ( @@ -1162,6 +1173,19 @@ def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_fn): else: assert "app_profile_id=" in routing_str + def test_close(self): + client = self._make_client() + table = self._make_one(client) + with mock.patch.object( + table._metrics, "close", mock.Mock() + ) as metric_close_mock: + with mock.patch.object( + client, "_remove_instance_registration" + ) as remove_mock: + table.close() + remove_mock.assert_called_once_with(table.instance_id, table) + metric_close_mock.assert_called_once() + @CrossSync._Sync_Impl.add_mapping_decorator("TestAuthorizedView") class TestAuthorizedView(CrossSync._Sync_Impl.TestTable): @@ -1188,6 +1212,9 @@ def _make_one( def test_ctor(self): from google.cloud.bigtable.data._helpers import _WarmedInstanceKey + from google.cloud.bigtable.data._metrics import ( + BigtableClientSideMetricsController, + ) expected_table_id = "table-id" expected_instance_id = "instance-id" @@ -1235,6 +1262,7 @@ def test_ctor(self): instance_key = _WarmedInstanceKey(view.instance_name, view.app_profile_id) assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(view)} + assert isinstance(view._metrics, BigtableClientSideMetricsController) assert view.default_operation_timeout == expected_operation_timeout assert view.default_attempt_timeout == expected_attempt_timeout assert ( diff --git a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py index 31430ad84..c4efcc5b9 100644 --- a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py +++ b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py @@ -17,6 +17,9 @@ import pytest from grpc import RpcError +from grpc import ClientCallDetails +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import OperationState from google.cloud.bigtable.data._cross_sync import CrossSync try: @@ -50,91 +53,255 @@ def _get_target_class(): def _make_one(self, *args, **kwargs): return self._get_target_class()(*args, **kwargs) + def test_unary_unary_interceptor_op_not_found(self): + """Test that interceptor call continuation if op is not found""" + instance = self._make_one() + continuation = CrossSync._Sync_Impl.Mock() + details = ClientCallDetails() + details.metadata = [] + request = mock.Mock() + instance.intercept_unary_unary(continuation, details, request) + continuation.assert_called_once_with(details, request) + def test_unary_unary_interceptor_success(self): """Test that interceptor handles successful unary-unary calls""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) continuation = CrossSync._Sync_Impl.Mock() call = continuation.return_value - details = mock.Mock() + call.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() result = instance.intercept_unary_unary(continuation, details, request) assert result == call continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + op.end_attempt_with_status.assert_not_called() def test_unary_unary_interceptor_failure(self): """test a failed RpcError with metadata""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) + exc = RpcError("test") + exc.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[("a", "b")]) + exc.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[("c", "d")]) + continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) + details = ClientCallDetails() + request = mock.Mock() + with pytest.raises(RpcError) as e: + instance.intercept_unary_unary(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + + def test_unary_unary_interceptor_failure_no_metadata(self): + """test with RpcError without without metadata attached""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) exc = RpcError("test") continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) - details = mock.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() with pytest.raises(RpcError) as e: instance.intercept_unary_unary(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_not_called() def test_unary_unary_interceptor_failure_generic(self): """test generic exception""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + ActiveOperationMetric._active_operation_context.set(op) exc = ValueError("test") continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) - details = mock.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() with pytest.raises(ValueError) as e: instance.intercept_unary_unary(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + op.add_response_metadata.assert_not_called() + + def test_unary_stream_interceptor_op_not_found(self): + """Test that interceptor calls continuation if op is not found""" + instance = self._make_one() + continuation = CrossSync._Sync_Impl.Mock() + details = ClientCallDetails() + details.metadata = [] + request = mock.Mock() + instance.intercept_unary_stream(continuation, details, request) + continuation.assert_called_once_with(details, request) def test_unary_stream_interceptor_success(self): """Test that interceptor handles successful unary-stream calls""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) continuation = CrossSync._Sync_Impl.Mock( return_value=_make_mock_stream_call([1, 2]) ) - details = mock.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[("a", "b")]) + call.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[("c", "d")]) + details = ClientCallDetails() request = mock.Mock() wrapper = instance.intercept_unary_stream(continuation, details, request) results = [val for val in wrapper] assert results == [1, 2] continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + op.end_attempt_with_status.assert_not_called() def test_unary_stream_interceptor_failure_mid_stream(self): """Test that interceptor handles failures mid-stream""" + from grpc.aio import AioRpcError, Metadata + instance = self._make_one() - exc = ValueError("test") + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) + exc = AioRpcError(0, Metadata(), Metadata(("a", "b"), ("c", "d"))) continuation = CrossSync._Sync_Impl.Mock( return_value=_make_mock_stream_call([1], exc=exc) ) - details = mock.Mock() + details = ClientCallDetails() request = mock.Mock() wrapper = instance.intercept_unary_stream(continuation, details, request) - with pytest.raises(ValueError) as e: + with pytest.raises(AioRpcError) as e: [val for val in wrapper] assert e.value == exc continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) def test_unary_stream_interceptor_failure_start_stream(self): """Test that interceptor handles failures at start of stream with RpcError with metadata""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) + exc = RpcError("test") + exc.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[("a", "b")]) + exc.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[("c", "d")]) + continuation = CrossSync._Sync_Impl.Mock() + continuation.side_effect = exc + details = ClientCallDetails() + request = mock.Mock() + with pytest.raises(RpcError) as e: + instance.intercept_unary_stream(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_called_once_with({"a": "b", "c": "d"}) + + def test_unary_stream_interceptor_failure_start_stream_no_metadata(self): + """Test that interceptor handles failures at start of stream with RpcError with no metadata""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) exc = RpcError("test") continuation = CrossSync._Sync_Impl.Mock() continuation.side_effect = exc - details = mock.Mock() + details = ClientCallDetails() request = mock.Mock() with pytest.raises(RpcError) as e: instance.intercept_unary_stream(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_not_called() def test_unary_stream_interceptor_failure_start_stream_generic(self): """Test that interceptor handles failures at start of stream with generic exception""" instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = OperationState.ACTIVE_ATTEMPT + op.start_time_ns = 0 + op.first_response_latency = None + ActiveOperationMetric._active_operation_context.set(op) exc = ValueError("test") continuation = CrossSync._Sync_Impl.Mock() continuation.side_effect = exc - details = mock.Mock() + details = ClientCallDetails() request = mock.Mock() with pytest.raises(ValueError) as e: instance.intercept_unary_stream(continuation, details, request) assert e.value == exc continuation.assert_called_once_with(details, request) + assert op.first_response_latency_ns is not None + op.add_response_metadata.assert_not_called() + + @pytest.mark.parametrize( + "initial_state", [OperationState.CREATED, OperationState.BETWEEN_ATTEMPTS] + ) + def test_unary_unary_interceptor_start_operation(self, initial_state): + """if called with a newly created operation, it should be started""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = initial_state + ActiveOperationMetric._active_operation_context.set(op) + continuation = CrossSync._Sync_Impl.Mock() + call = continuation.return_value + call.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[]) + call.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[]) + details = ClientCallDetails() + request = mock.Mock() + instance.intercept_unary_unary(continuation, details, request) + op.start_attempt.assert_called_once() + + @pytest.mark.parametrize( + "initial_state", [OperationState.CREATED, OperationState.BETWEEN_ATTEMPTS] + ) + def test_unary_stream_interceptor_start_operation(self, initial_state): + """if called with a newly created operation, it should be started""" + instance = self._make_one() + op = mock.Mock() + op.uuid = "test-uuid" + op.state = initial_state + ActiveOperationMetric._active_operation_context.set(op) + continuation = CrossSync._Sync_Impl.Mock( + return_value=_make_mock_stream_call([1, 2]) + ) + call = continuation.return_value + call.trailing_metadata = CrossSync._Sync_Impl.Mock(return_value=[]) + call.initial_metadata = CrossSync._Sync_Impl.Mock(return_value=[]) + details = ClientCallDetails() + request = mock.Mock() + instance.intercept_unary_stream(continuation, details, request) + op.start_attempt.assert_called_once() diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 96c726a20..c8540024d 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -266,3 +266,98 @@ def test_get_retryable_errors(self, input_codes, input_table, expected): setattr(fake_table, f"{key}_retryable_errors", input_table[key]) result = _helpers._get_retryable_errors(input_codes, fake_table) assert result == expected + + +class TestTrackedBackoffGenerator: + def test_tracked_backoff_generator_history(self): + """ + Should be able to retrieve historical results from backoff generator + """ + generator = _helpers.TrackedBackoffGenerator( + initial=0, multiplier=2, maximum=10 + ) + got_list = [next(generator) for _ in range(20)] + + # check all values are correct + for i in range(19, 0, -1): + assert generator.get_attempt_backoff(i) == got_list[i] + # check a random value out of order + assert generator.get_attempt_backoff(5) == got_list[5] + + @mock.patch("random.uniform", side_effect=lambda a, b: b) + def test_tracked_backoff_generator_defaults(self, mock_uniform): + """ + Should generate values with default parameters + + initial=0.01, multiplier=2, maximum=60 + """ + generator = _helpers.TrackedBackoffGenerator() + expected_values = [0.01, 0.02, 0.04, 0.08, 0.16] + for expected in expected_values: + assert next(generator) == pytest.approx(expected) + + @mock.patch("random.uniform", side_effect=lambda a, b: b) + def test_tracked_backoff_generator_with_maximum(self, mock_uniform): + """ + Should cap the backoff at the maximum value + """ + generator = _helpers.TrackedBackoffGenerator(initial=1, multiplier=2, maximum=5) + expected_values = [1, 2, 4, 5, 5, 5] + for expected in expected_values: + assert next(generator) == expected + + def test_get_attempt_backoff_out_of_bounds(self): + """ + get_attempt_backoff should raise IndexError for out of bounds index + """ + generator = _helpers.TrackedBackoffGenerator() + next(generator) + next(generator) + with pytest.raises(IndexError): + generator.get_attempt_backoff(2) + with pytest.raises(IndexError): + generator.get_attempt_backoff(-3) + + def test_set_next_full_set(self): + """ + try always using set_next to populate generator + """ + generator = _helpers.TrackedBackoffGenerator() + for idx, val in enumerate(range(100, 0, -1)): + generator.set_next(val) + got = next(generator) + assert got == val + assert generator.get_attempt_backoff(idx) == val + + def test_set_next_negative_value(self): + generator = _helpers.TrackedBackoffGenerator() + with pytest.raises(ValueError): + generator.set_next(-1) + + @mock.patch("random.uniform", side_effect=lambda a, b: b) + def test_interleaved_set_next(self, mock_uniform): + import itertools + + generator = _helpers.TrackedBackoffGenerator( + initial=1, multiplier=2, maximum=128 + ) + # values we expect generator to create + expected_values = [2**i for i in range(8)] + # values we will insert + inserted_values = [9, 61, 0, 4, 33, 12, 18, 2] + for idx in range(8): + assert next(generator) == expected_values[idx] + generator.set_next(inserted_values[idx]) + assert next(generator) == inserted_values[idx] + # check to make sure history is as we expect + generator.history = itertools.chain.from_iterable( + zip(expected_values, inserted_values) + ) + + @mock.patch("random.uniform", side_effect=lambda a, b: b) + def test_set_next_replacement(self, mock_uniform): + generator = _helpers.TrackedBackoffGenerator(initial=1) + generator.set_next(99) + generator.set_next(88) + assert next(generator) == 88 + assert next(generator) == 1