Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
d2175f1
use replaceable channel wrapper
daniel-sanche Jul 25, 2025
5e107fc
got unit tests working
daniel-sanche Jul 26, 2025
c4a97e1
put back in cache invalidation
daniel-sanche Jul 26, 2025
e71b1d5
added wrapped multicallables to avoid cache invalidation
daniel-sanche Jul 26, 2025
b81a9be
added crosssync, moved close logic back to client
daniel-sanche Jul 29, 2025
a1dffb5
generated sync code
daniel-sanche Jul 29, 2025
e3ec02b
got tests running
daniel-sanche Jul 29, 2025
4e13783
fixed tests
daniel-sanche Jul 29, 2025
7d90a04
remove extra wrapper; added invalidate_stubs helper
daniel-sanche Jul 29, 2025
26cd601
fixed lint
daniel-sanche Jul 29, 2025
375332f
fixed lint
daniel-sanche Jul 29, 2025
428d75a
renamed replaceablechannel to swappablechannel
daniel-sanche Jul 29, 2025
4b39bc5
added tests
daniel-sanche Jul 29, 2025
3f090c2
added docstrings
daniel-sanche Jul 29, 2025
883ceab
Merge branch 'main' into refactor_refresh
daniel-sanche Jul 29, 2025
04c762a
initial commit
daniel-sanche Jul 29, 2025
29dff4d
added back interceptor
daniel-sanche Jul 29, 2025
e4f8238
added metrics to client
daniel-sanche Jul 29, 2025
fcb062e
fixed lint
daniel-sanche Aug 1, 2025
ac8dbe4
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 1, 2025
d155f8a
set up channel interceptions
daniel-sanche Aug 2, 2025
9fece96
added TrackedBackoffGenerator
daniel-sanche Aug 2, 2025
aec2577
fixed lint
daniel-sanche Aug 2, 2025
ec4e847
fixed import
daniel-sanche Aug 2, 2025
8f99e4e
added operation.cancel
daniel-sanche Aug 6, 2025
f8e6603
added operation cancelled to interceptor
daniel-sanche Aug 6, 2025
f5e057e
gave each operation a uuid
daniel-sanche Aug 6, 2025
8c397bb
return attempt metric on new attempt
daniel-sanche Aug 7, 2025
2c34198
use standard context manager
daniel-sanche Aug 7, 2025
9bd1e07
use default backoff generator
daniel-sanche Aug 7, 2025
96d1355
require backoff; refactor check
daniel-sanche Aug 7, 2025
de5d07b
fixed context manager naming; lint
daniel-sanche Aug 7, 2025
d73f379
moved first_response_latency to operation
daniel-sanche Aug 7, 2025
a2070f9
Merge branch 'main' into refactor_refresh
daniel-sanche Aug 8, 2025
4a4f80a
fixed import
daniel-sanche Aug 8, 2025
5ea9f0e
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 11, 2025
708a35a
fixed broken unit tests
daniel-sanche Aug 11, 2025
67c08fd
Merge branch 'refactor_refresh' into csm_1_data_model
daniel-sanche Aug 11, 2025
5ae7acc
added set_next to TrackedBackoffGenerator
daniel-sanche Aug 11, 2025
cb32296
added assertions to test_client
daniel-sanche Aug 12, 2025
f07e765
added new test metrics interceptor file
daniel-sanche Aug 26, 2025
a34c01e
first round of tests
daniel-sanche Aug 26, 2025
84f61ee
added metadata capture for failed rpcs
daniel-sanche Aug 26, 2025
d4ae637
added test for starting attempts
daniel-sanche Aug 26, 2025
1fbcadd
added sync tests
daniel-sanche Aug 26, 2025
edacd04
got tests passing
daniel-sanche Aug 26, 2025
c628d21
removed helper class
daniel-sanche Aug 26, 2025
6d585ec
refactoring
daniel-sanche Aug 26, 2025
01e6b36
refactored interceptor
daniel-sanche Aug 26, 2025
05fe577
added unit tests for interceptor
daniel-sanche Aug 26, 2025
4871abd
Merge branch 'main' into csm_1_data_model
daniel-sanche Aug 26, 2025
b6eac6c
fixed lint
daniel-sanche Aug 26, 2025
019a8c2
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 9, 2025
4ccfdab
removed duplicate import
daniel-sanche Sep 9, 2025
bd9ab70
added more tests
daniel-sanche Sep 9, 2025
50b3e48
remove operation metadata key
daniel-sanche Sep 9, 2025
2b35127
assign metadata directly
daniel-sanche Sep 9, 2025
9cbda99
added test
daniel-sanche Sep 9, 2025
73f4b3c
replace details mocks with real type
daniel-sanche Sep 9, 2025
d5e012d
strip operation id from metadata before request
daniel-sanche Sep 9, 2025
486068b
added try; generated sync
daniel-sanche Sep 9, 2025
bebeb70
use contextvars
daniel-sanche Sep 30, 2025
5ba2bbe
pulled in improvements to data model
daniel-sanche Sep 30, 2025
4098fd9
removed cancel from spec
daniel-sanche Sep 30, 2025
e8785ac
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
c1cc24d
fixed tests
daniel-sanche Sep 30, 2025
85d4cf0
Merge branch 'csm_interceptor' into csm_1_data_model
daniel-sanche Sep 30, 2025
ed9d3cf
fixed lint
daniel-sanche Oct 1, 2025
bc6036e
added close to metric spec
daniel-sanche Oct 2, 2025
18ec330
fixed lint
daniel-sanche Oct 2, 2025
f1be54a
added test
daniel-sanche Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController

from google.cloud.bigtable.data._cross_sync import CrossSync

Expand Down Expand Up @@ -969,6 +970,8 @@ def __init__(
default_retryable_errors or ()
)

self._metrics = BigtableClientSideMetricsController()

try:
self._register_instance_future = CrossSync.create_task(
self.client._register_instance,
Expand Down Expand Up @@ -1682,6 +1685,7 @@ async def close(self):
"""
Called to close the Table instance and release any resources held by it.
"""
self._metrics.close()
if self._register_instance_future:
self._register_instance_future.cancel()
await self.client._remove_instance_registration(self.instance_id, self)
Expand Down
108 changes: 101 additions & 7 deletions google/cloud/bigtable/data/_async/metrics_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,22 @@
# limitations under the License
from __future__ import annotations

from typing import Sequence

import time
from functools import wraps

from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
from google.cloud.bigtable.data._metrics.data_model import OperationState
from google.cloud.bigtable.data._metrics.data_model import OperationType
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler

from google.cloud.bigtable.data._cross_sync import CrossSync

if CrossSync.is_async:
from grpc.aio import UnaryUnaryClientInterceptor
from grpc.aio import UnaryStreamClientInterceptor
from grpc.aio import AioRpcError
else:
from grpc import UnaryUnaryClientInterceptor
from grpc import UnaryStreamClientInterceptor
Expand All @@ -26,30 +37,93 @@
__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.metrics_interceptor"


def _with_operation_from_metadata(func):
"""
Decorator for interceptor methods to extract the active operation
from metadata and pass it to the decorated function.
"""

@wraps(func)
def wrapper(self, continuation, client_call_details, request):
operation: "ActiveOperationMetric" | None = ActiveOperationMetric.get_active()

if operation:
# start a new attempt if not started
if (
operation.state == OperationState.CREATED
or operation.state == OperationState.BETWEEN_ATTEMPTS
):
operation.start_attempt()
# wrap continuation in logic to process the operation
return func(self, operation, continuation, client_call_details, request)
else:
# if operation not found, return unwrapped continuation
return continuation(client_call_details, request)

return wrapper


@CrossSync.convert
async def _get_metadata(source) -> dict[str, str | bytes] | None:
"""Helper to extract metadata from a call or RpcError"""
try:
metadata: Sequence[tuple[str, str | bytes]]
if CrossSync.is_async:
# grpc.aio returns metadata in Metadata objects
if isinstance(source, AioRpcError):
metadata = list(source.trailing_metadata()) + list(
source.initial_metadata()
)
else:
metadata = list(await source.trailing_metadata()) + list(
await source.initial_metadata()
)
else:
# sync grpc returns metadata as a sequence of tuples
metadata = source.trailing_metadata() + source.initial_metadata()
# convert metadata to dict format
return {k: v for (k, v) in metadata}
except Exception:
# ignore errors while fetching metadata
return None


@CrossSync.convert_class(sync_name="BigtableMetricsInterceptor")
class AsyncBigtableMetricsInterceptor(
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, MetricsHandler
):
"""
An async gRPC interceptor to add client metadata and print server metadata.
"""

@CrossSync.convert
async def intercept_unary_unary(self, continuation, client_call_details, request):
@_with_operation_from_metadata
async def intercept_unary_unary(
self, operation, continuation, client_call_details, request
):
"""
Interceptor for unary rpcs:
- MutateRow
- CheckAndMutateRow
- ReadModifyWriteRow
"""
metadata = None
try:
call = await continuation(client_call_details, request)
metadata = await _get_metadata(call)
return call
except Exception as rpc_error:
metadata = await _get_metadata(rpc_error)
raise rpc_error
finally:
if metadata is not None:
operation.add_response_metadata(metadata)

@CrossSync.convert
async def intercept_unary_stream(self, continuation, client_call_details, request):
@_with_operation_from_metadata
async def intercept_unary_stream(
self, operation, continuation, client_call_details, request
):
"""
Interceptor for streaming rpcs:
- ReadRows
Expand All @@ -58,21 +132,41 @@ async def intercept_unary_stream(self, continuation, client_call_details, reques
"""
try:
return self._streaming_generator_wrapper(
await continuation(client_call_details, request)
operation, await continuation(client_call_details, request)
)
except Exception as rpc_error:
# handle errors while intializing stream
metadata = await _get_metadata(rpc_error)
if metadata is not None:
operation.add_response_metadata(metadata)
raise rpc_error

@staticmethod
@CrossSync.convert
async def _streaming_generator_wrapper(call):
async def _streaming_generator_wrapper(operation, call):
"""
Wrapped generator to be returned by intercept_unary_stream
"""
# only track has_first response for READ_ROWS
has_first_response = (
operation.first_response_latency_ns is not None
or operation.op_type != OperationType.READ_ROWS
)
encountered_exc = None
try:
async for response in call:
# record time to first response. Currently only used for READ_ROWs
if not has_first_response:
operation.first_response_latency_ns = (
time.monotonic_ns() - operation.start_time_ns
)
has_first_response = True
yield response
except Exception as e:
# handle errors while processing stream
raise e
encountered_exc = e
raise
finally:
if call is not None:
metadata = await _get_metadata(encountered_exc or call)
if metadata is not None:
operation.add_response_metadata(metadata)
59 changes: 59 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery

from google.api_core import exceptions as core_exceptions
from google.api_core.retry import exponential_sleep_generator
from google.api_core.retry import RetryFailureReason
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup

Expand Down Expand Up @@ -248,3 +249,61 @@ def _get_retryable_errors(
call_codes = table.default_mutate_rows_retryable_errors

return [_get_error_type(e) for e in call_codes]


class TrackedBackoffGenerator:
"""
Generator class for exponential backoff sleep times.
This implementation builds on top of api_core.retries.exponential_sleep_generator,
adding the ability to retrieve previous values using get_attempt_backoff(idx).
This is used by the Metrics class to track the sleep times used for each attempt.
"""

def __init__(self, initial=0.01, maximum=60, multiplier=2):
self.history = []
self.subgenerator = exponential_sleep_generator(
initial=initial, maximum=maximum, multiplier=multiplier
)
self._next_override: float | None = None

def __iter__(self):
return self

def set_next(self, next_value: float):
"""
Set the next backoff value, instead of generating one from subgenerator.
After the value is yielded, it will go back to using self.subgenerator.

If set_next is called twice before the next() is called, only the latest
value will be used and others discarded

Args:
next_value: the upcomming value to yield when next() is called
Raises:
ValueError: if next_value is negative
"""
if next_value < 0:
raise ValueError("backoff value cannot be less than 0")
self._next_override = next_value

def __next__(self) -> float:
if self._next_override is not None:
next_backoff = self._next_override
self._next_override = None
else:
next_backoff = next(self.subgenerator)
self.history.append(next_backoff)
return next_backoff

def get_attempt_backoff(self, attempt_idx) -> float:
"""
returns the backoff time for a specific attempt index, starting at 0.

Args:
attempt_idx: the index of the attempt to return backoff for
Raises:
IndexError: if attempt_idx is negative, or not in history
"""
if attempt_idx < 0:
raise IndexError("received negative attempt number")
return self.history[attempt_idx]
31 changes: 31 additions & 0 deletions google/cloud/bigtable/data/_metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud.bigtable.data._metrics.metrics_controller import (
BigtableClientSideMetricsController,
)

from google.cloud.bigtable.data._metrics.data_model import OperationType
from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric
from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric

__all__ = (
"BigtableClientSideMetricsController",
"OperationType",
"ActiveOperationMetric",
"ActiveAttemptMetric",
"CompletedOperationMetric",
"CompletedAttemptMetric",
)
Loading
Loading