From 07b32959adc0ff8bb01fd976e22e9bbc41bdd9cc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 9 Sep 2025 13:53:44 -0700 Subject: [PATCH 01/12] feat: added metrics interceptor --- google/cloud/bigtable/data/_async/client.py | 41 ++-- .../data/_async/metrics_interceptor.py | 85 +++++++++ .../bigtable/data/_sync_autogen/client.py | 34 +++- .../data/_sync_autogen/metrics_interceptor.py | 71 +++++++ .../data/_async/test_metrics_interceptor.py | 175 ++++++++++++++++++ .../_sync_autogen/test_metrics_interceptor.py | 147 +++++++++++++++ 6 files changed, 531 insertions(+), 22 deletions(-) create mode 100644 google/cloud/bigtable/data/_async/metrics_interceptor.py create mode 100644 google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py create mode 100644 tests/unit/data/_async/test_metrics_interceptor.py create mode 100644 tests/unit/data/_sync_autogen/test_metrics_interceptor.py diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 40f30f1d8..7cf7e8625 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -97,18 +97,24 @@ ) from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE from google.cloud.bigtable.data._async._swappable_channel import ( - AsyncSwappableChannel, + AsyncSwappableChannel as SwappableChannelType, + ) + from google.cloud.bigtable.data._async.metrics_interceptor import ( + AsyncBigtableMetricsInterceptor as MetricInterceptorType, ) else: from typing import Iterable # noqa: F401 from grpc import insecure_channel + from grpc import intercept_channel from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport as TransportType # type: ignore from google.cloud.bigtable_v2.services.bigtable import BigtableClient as GapicClient # type: ignore from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE from google.cloud.bigtable.data._sync_autogen._swappable_channel import ( # noqa: F401 - SwappableChannel, + SwappableChannel as SwappableChannelType, + ) + from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( # noqa: F401 + BigtableMetricsInterceptor as MetricInterceptorType, ) - if TYPE_CHECKING: from google.cloud.bigtable.data._helpers import RowKeySamples @@ -203,7 +209,7 @@ def __init__( credentials = google.auth.credentials.AnonymousCredentials() if project is None: project = _DEFAULT_BIGTABLE_EMULATOR_CLIENT - + self._metrics_interceptor = MetricInterceptorType() # initialize client ClientWithProject.__init__( self, @@ -257,12 +263,11 @@ def __init__( stacklevel=2, ) - @CrossSync.convert(replace_symbols={"AsyncSwappableChannel": "SwappableChannel"}) - def _build_grpc_channel(self, *args, **kwargs) -> AsyncSwappableChannel: + def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: """ This method is called by the gapic transport to create a grpc channel. - The init arguments passed down are captured in a partial used by AsyncSwappableChannel + The init arguments passed down are captured in a partial used by SwappableChannel to create new channel instances in the future, as part of the channel refresh logic Emulators always use an inseucre channel @@ -276,9 +281,22 @@ def _build_grpc_channel(self, *args, **kwargs) -> AsyncSwappableChannel: if self._emulator_host is not None: # emulators use insecure channel create_channel_fn = partial(insecure_channel, self._emulator_host) - else: + elif CrossSync.is_async: create_channel_fn = partial(TransportType.create_channel, *args, **kwargs) - return AsyncSwappableChannel(create_channel_fn) + else: + # attach sync interceptors in create_channel_fn + def create_channel_fn(): + return intercept_channel( + TransportType.create_channel(*args, **kwargs), + self._metrics_interceptor, + ) + + new_channel = SwappableChannelType(create_channel_fn) + if CrossSync.is_async: + # attach async interceptors + new_channel._unary_unary_interceptors.append(self._metrics_interceptor) + new_channel._unary_stream_interceptors.append(self._metrics_interceptor) + return new_channel @property def universe_domain(self) -> str: @@ -400,7 +418,6 @@ def _invalidate_channel_stubs(self): self.transport._stubs = {} self.transport._prep_wrapped_messages(self.client_info) - @CrossSync.convert(replace_symbols={"AsyncSwappableChannel": "SwappableChannel"}) async def _manage_channel( self, refresh_interval_min: float = 60 * 35, @@ -425,10 +442,10 @@ async def _manage_channel( grace_period: time to allow previous channel to serve existing requests before closing, in seconds """ - if not isinstance(self.transport.grpc_channel, AsyncSwappableChannel): + if not isinstance(self.transport.grpc_channel, SwappableChannelType): warnings.warn("Channel does not support auto-refresh.") return - super_channel: AsyncSwappableChannel = self.transport.grpc_channel + super_channel: SwappableChannelType = self.transport.grpc_channel first_refresh = self._channel_init_time + random.uniform( refresh_interval_min, refresh_interval_max ) diff --git a/google/cloud/bigtable/data/_async/metrics_interceptor.py b/google/cloud/bigtable/data/_async/metrics_interceptor.py new file mode 100644 index 000000000..6940926e7 --- /dev/null +++ b/google/cloud/bigtable/data/_async/metrics_interceptor.py @@ -0,0 +1,85 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License +from __future__ import annotations + +from google.cloud.bigtable.data._cross_sync import CrossSync + +if CrossSync.is_async: + from grpc.aio import UnaryUnaryClientInterceptor + from grpc.aio import UnaryStreamClientInterceptor +else: + from grpc import UnaryUnaryClientInterceptor + from grpc import UnaryStreamClientInterceptor + + +__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.metrics_interceptor" + + +@CrossSync.convert_class(sync_name="BigtableMetricsInterceptor") +class AsyncBigtableMetricsInterceptor( + UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor +): + """ + An async gRPC interceptor to add client metadata and print server metadata. + """ + + def __init__(self): + super().__init__() + self.operation_map = {} + + def register_operation(self, operation): + """ + Register an operation object to be tracked my the interceptor + + When registered, the operation will receive metadata updates: + - start_attempt if attempt not started when rpc is being sent + - add_response_metadata after call is complete + - end_attempt_with_status if attempt receives an error + + The interceptor will register itself as a handeler for the operation, + so it can unregister the operation when it is complete + """ + self.operation_map[operation.uuid] = operation + operation.handlers.append(self) + + def on_operation_complete(self, op): + if op.uuid in self.operation_map: + del self.operation_map[op.uuid] + + def on_operation_cancelled(self, op): + self.on_operation_complete(op) + + @CrossSync.convert + async def intercept_unary_unary(self, continuation, client_call_details, request): + try: + call = await continuation(client_call_details, request) + return call + except Exception as rpc_error: + raise rpc_error + + @CrossSync.convert + async def intercept_unary_stream(self, continuation, client_call_details, request): + async def response_wrapper(call): + try: + async for response in call: + yield response + except Exception as e: + # handle errors while processing stream + raise e + + try: + return response_wrapper(await continuation(client_call_details, request)) + except Exception as rpc_error: + # handle errors while intializing stream + raise rpc_error diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 1c75823ae..645a5fe08 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -75,12 +75,18 @@ from google.cloud.bigtable.data._cross_sync import CrossSync from typing import Iterable from grpc import insecure_channel +from grpc import intercept_channel from google.cloud.bigtable_v2.services.bigtable.transports import ( BigtableGrpcTransport as TransportType, ) from google.cloud.bigtable_v2.services.bigtable import BigtableClient as GapicClient from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE -from google.cloud.bigtable.data._sync_autogen._swappable_channel import SwappableChannel +from google.cloud.bigtable.data._sync_autogen._swappable_channel import ( + SwappableChannel as SwappableChannelType, +) +from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( + BigtableMetricsInterceptor as MetricInterceptorType, +) if TYPE_CHECKING: from google.cloud.bigtable.data._helpers import RowKeySamples @@ -143,6 +149,7 @@ def __init__( credentials = google.auth.credentials.AnonymousCredentials() if project is None: project = _DEFAULT_BIGTABLE_EMULATOR_CLIENT + self._metrics_interceptor = MetricInterceptorType() ClientWithProject.__init__( self, credentials=credentials, @@ -186,7 +193,7 @@ def __init__( stacklevel=2, ) - def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannel: + def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: """This method is called by the gapic transport to create a grpc channel. The init arguments passed down are captured in a partial used by SwappableChannel @@ -202,8 +209,15 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannel: if self._emulator_host is not None: create_channel_fn = partial(insecure_channel, self._emulator_host) else: - create_channel_fn = partial(TransportType.create_channel, *args, **kwargs) - return SwappableChannel(create_channel_fn) + + def create_channel_fn(): + return intercept_channel( + TransportType.create_channel(*args, **kwargs), + self._metrics_interceptor, + ) + + new_channel = SwappableChannelType(create_channel_fn) + return new_channel @property def universe_domain(self) -> str: @@ -302,7 +316,7 @@ def _invalidate_channel_stubs(self): self.transport._stubs = {} self.transport._prep_wrapped_messages(self.client_info) - def _manage_channel( + async def _manage_channel( self, refresh_interval_min: float = 60 * 35, refresh_interval_max: float = 60 * 45, @@ -324,25 +338,25 @@ def _manage_channel( between `refresh_interval_min` and `refresh_interval_max` grace_period: time to allow previous channel to serve existing requests before closing, in seconds""" - if not isinstance(self.transport.grpc_channel, SwappableChannel): + if not isinstance(self.transport.grpc_channel, SwappableChannelType): warnings.warn("Channel does not support auto-refresh.") return - super_channel: SwappableChannel = self.transport.grpc_channel + super_channel: SwappableChannelType = self.transport.grpc_channel first_refresh = self._channel_init_time + random.uniform( refresh_interval_min, refresh_interval_max ) next_sleep = max(first_refresh - time.monotonic(), 0) if next_sleep > 0: - self._ping_and_warm_instances(channel=super_channel) + await self._ping_and_warm_instances(channel=super_channel) while not self._is_closed.is_set(): - CrossSync._Sync_Impl.event_wait( + await CrossSync._Sync_Impl.event_wait( self._is_closed, next_sleep, async_break_early=False ) if self._is_closed.is_set(): break start_timestamp = time.monotonic() new_channel = super_channel.create_channel() - self._ping_and_warm_instances(channel=new_channel) + await self._ping_and_warm_instances(channel=new_channel) old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() if grace_period: diff --git a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py new file mode 100644 index 000000000..bda4211b1 --- /dev/null +++ b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py @@ -0,0 +1,71 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +# This file is automatically generated by CrossSync. Do not edit manually. + +from __future__ import annotations +from grpc import UnaryUnaryClientInterceptor +from grpc import UnaryStreamClientInterceptor + + +class BigtableMetricsInterceptor( + UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor +): + """ + An async gRPC interceptor to add client metadata and print server metadata. + """ + + def __init__(self): + super().__init__() + self.operation_map = {} + + def register_operation(self, operation): + """Register an operation object to be tracked my the interceptor + + When registered, the operation will receive metadata updates: + - start_attempt if attempt not started when rpc is being sent + - add_response_metadata after call is complete + - end_attempt_with_status if attempt receives an error + + The interceptor will register itself as a handeler for the operation, + so it can unregister the operation when it is complete""" + self.operation_map[operation.uuid] = operation + operation.handlers.append(self) + + def on_operation_complete(self, op): + if op.uuid in self.operation_map: + del self.operation_map[op.uuid] + + def on_operation_cancelled(self, op): + self.on_operation_complete(op) + + def intercept_unary_unary(self, continuation, client_call_details, request): + try: + call = continuation(client_call_details, request) + return call + except Exception as rpc_error: + raise rpc_error + + def intercept_unary_stream(self, continuation, client_call_details, request): + def response_wrapper(call): + try: + for response in call: + yield response + except Exception as e: + raise e + + try: + return response_wrapper(continuation(client_call_details, request)) + except Exception as rpc_error: + raise rpc_error diff --git a/tests/unit/data/_async/test_metrics_interceptor.py b/tests/unit/data/_async/test_metrics_interceptor.py new file mode 100644 index 000000000..1c883eda5 --- /dev/null +++ b/tests/unit/data/_async/test_metrics_interceptor.py @@ -0,0 +1,175 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from grpc import RpcError + +from google.cloud.bigtable.data._cross_sync import CrossSync + +# try/except added for compatibility with python < 3.8 +try: + from unittest import mock +except ImportError: # pragma: NO COVER + import mock # type: ignore + +if CrossSync.is_async: + from google.cloud.bigtable.data._async.metrics_interceptor import ( + AsyncBigtableMetricsInterceptor, + ) +else: + from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( # noqa: F401 + BigtableMetricsInterceptor, + ) + + +__CROSS_SYNC_OUTPUT__ = "tests.unit.data._sync_autogen.test_metrics_interceptor" + + +@CrossSync.convert(replace_symbols={"__aiter__": "__iter__"}) +def _make_mock_stream_call(values, exc=None): + """ + Create a mock call object that can be used for streaming calls + """ + call = CrossSync.Mock() + + async def gen(): + for val in values: + yield val + if exc: + raise exc + + call.__aiter__ = mock.Mock(return_value=gen()) + return call + + +@CrossSync.convert_class(sync_name="TestMetricsInterceptor") +class TestMetricsInterceptorAsync: + @staticmethod + @CrossSync.convert( + replace_symbols={ + "AsyncBigtableMetricsInterceptor": "BigtableMetricsInterceptor" + } + ) + def _get_target_class(): + return AsyncBigtableMetricsInterceptor + + def _make_one(self, *args, **kwargs): + return self._get_target_class()(*args, **kwargs) + + def test_ctor(self): + instance = self._make_one() + assert instance.operation_map == {} + + @CrossSync.pytest + async def test_unary_unary_interceptor_success(self): + """Test that interceptor handles successful unary-unary calls""" + instance = self._make_one() + continuation = CrossSync.Mock() + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + result = await instance.intercept_unary_unary(continuation, details, request) + assert result == call + continuation.assert_called_once_with(details, request) + + @CrossSync.pytest + async def test_unary_unary_interceptor_failure(self): + """test a failed RpcError with metadata""" + + instance = self._make_one() + exc = RpcError("test") + continuation = CrossSync.Mock(side_effect=exc) + details = mock.Mock() + request = mock.Mock() + with pytest.raises(RpcError) as e: + await instance.intercept_unary_unary(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + + @CrossSync.pytest + async def test_unary_unary_interceptor_failure_generic(self): + """test generic exception""" + + instance = self._make_one() + exc = ValueError("test") + continuation = CrossSync.Mock(side_effect=exc) + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + with pytest.raises(ValueError) as e: + await instance.intercept_unary_unary(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + + @CrossSync.pytest + async def test_unary_stream_interceptor_success(self): + """Test that interceptor handles successful unary-stream calls""" + + instance = self._make_one() + + continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1, 2])) + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + wrapper = await instance.intercept_unary_stream(continuation, details, request) + results = [val async for val in wrapper] + assert results == [1, 2] + continuation.assert_called_once_with(details, request) + + @CrossSync.pytest + async def test_unary_stream_interceptor_failure_mid_stream(self): + """Test that interceptor handles failures mid-stream""" + instance = self._make_one() + exc = ValueError("test") + continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1], exc=exc)) + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + wrapper = await instance.intercept_unary_stream(continuation, details, request) + with pytest.raises(ValueError) as e: + [val async for val in wrapper] + assert e.value == exc + continuation.assert_called_once_with(details, request) + + @CrossSync.pytest + async def test_unary_stream_interceptor_failure_start_stream(self): + """Test that interceptor handles failures at start of stream with RpcError with metadata""" + + instance = self._make_one() + exc = RpcError("test") + + continuation = CrossSync.Mock() + continuation.side_effect = exc + details = mock.Mock() + request = mock.Mock() + with pytest.raises(RpcError) as e: + await instance.intercept_unary_stream(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + + @CrossSync.pytest + async def test_unary_stream_interceptor_failure_start_stream_generic(self): + """Test that interceptor handles failures at start of stream with generic exception""" + + instance = self._make_one() + exc = ValueError("test") + + continuation = CrossSync.Mock() + continuation.side_effect = exc + details = mock.Mock() + request = mock.Mock() + with pytest.raises(ValueError) as e: + await instance.intercept_unary_stream(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) \ No newline at end of file diff --git a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py new file mode 100644 index 000000000..596d3306e --- /dev/null +++ b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py @@ -0,0 +1,147 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file is automatically generated by CrossSync. Do not edit manually. + +import pytest +from grpc import RpcError +from google.cloud.bigtable.data._cross_sync import CrossSync + +try: + from unittest import mock +except ImportError: + import mock +from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( + BigtableMetricsInterceptor, +) + + +def _make_mock_stream_call(values, exc=None): + """Create a mock call object that can be used for streaming calls""" + call = CrossSync._Sync_Impl.Mock() + + def gen(): + for val in values: + yield val + if exc: + raise exc + + call.__iter__ = mock.Mock(return_value=gen()) + return call + + +class TestMetricsInterceptor: + @staticmethod + def _get_target_class(): + return BigtableMetricsInterceptor + + def _make_one(self, *args, **kwargs): + return self._get_target_class()(*args, **kwargs) + + def test_ctor(self): + instance = self._make_one() + assert instance.operation_map == {} + + def test_unary_unary_interceptor_success(self): + """Test that interceptor handles successful unary-unary calls""" + instance = self._make_one() + continuation = CrossSync._Sync_Impl.Mock() + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + result = instance.intercept_unary_unary(continuation, details, request) + assert result == call + continuation.assert_called_once_with(details, request) + + def test_unary_unary_interceptor_failure(self): + """test a failed RpcError with metadata""" + instance = self._make_one() + exc = RpcError("test") + continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) + details = mock.Mock() + request = mock.Mock() + with pytest.raises(RpcError) as e: + instance.intercept_unary_unary(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + + def test_unary_unary_interceptor_failure_generic(self): + """test generic exception""" + instance = self._make_one() + exc = ValueError("test") + continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + with pytest.raises(ValueError) as e: + instance.intercept_unary_unary(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + + def test_unary_stream_interceptor_success(self): + """Test that interceptor handles successful unary-stream calls""" + instance = self._make_one() + continuation = CrossSync._Sync_Impl.Mock( + return_value=_make_mock_stream_call([1, 2]) + ) + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + wrapper = instance.intercept_unary_stream(continuation, details, request) + results = [val for val in wrapper] + assert results == [1, 2] + continuation.assert_called_once_with(details, request) + + def test_unary_stream_interceptor_failure_mid_stream(self): + """Test that interceptor handles failures mid-stream""" + instance = self._make_one() + exc = ValueError("test") + continuation = CrossSync._Sync_Impl.Mock( + return_value=_make_mock_stream_call([1], exc=exc) + ) + call = continuation.return_value + details = mock.Mock() + request = mock.Mock() + wrapper = instance.intercept_unary_stream(continuation, details, request) + with pytest.raises(ValueError) as e: + [val for val in wrapper] + assert e.value == exc + continuation.assert_called_once_with(details, request) + + def test_unary_stream_interceptor_failure_start_stream(self): + """Test that interceptor handles failures at start of stream with RpcError with metadata""" + instance = self._make_one() + exc = RpcError("test") + continuation = CrossSync._Sync_Impl.Mock() + continuation.side_effect = exc + details = mock.Mock() + request = mock.Mock() + with pytest.raises(RpcError) as e: + instance.intercept_unary_stream(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) + + def test_unary_stream_interceptor_failure_start_stream_generic(self): + """Test that interceptor handles failures at start of stream with generic exception""" + instance = self._make_one() + exc = ValueError("test") + continuation = CrossSync._Sync_Impl.Mock() + continuation.side_effect = exc + details = mock.Mock() + request = mock.Mock() + with pytest.raises(ValueError) as e: + instance.intercept_unary_stream(continuation, details, request) + assert e.value == exc + continuation.assert_called_once_with(details, request) From e3ac1318e5cda4763819efce6229f8295d0fb721 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 9 Sep 2025 13:56:21 -0700 Subject: [PATCH 02/12] pulled out operation logic --- .../data/_async/metrics_interceptor.py | 26 ------------------- .../data/_sync_autogen/metrics_interceptor.py | 24 ----------------- 2 files changed, 50 deletions(-) diff --git a/google/cloud/bigtable/data/_async/metrics_interceptor.py b/google/cloud/bigtable/data/_async/metrics_interceptor.py index 6940926e7..10bed46ca 100644 --- a/google/cloud/bigtable/data/_async/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_async/metrics_interceptor.py @@ -34,32 +34,6 @@ class AsyncBigtableMetricsInterceptor( An async gRPC interceptor to add client metadata and print server metadata. """ - def __init__(self): - super().__init__() - self.operation_map = {} - - def register_operation(self, operation): - """ - Register an operation object to be tracked my the interceptor - - When registered, the operation will receive metadata updates: - - start_attempt if attempt not started when rpc is being sent - - add_response_metadata after call is complete - - end_attempt_with_status if attempt receives an error - - The interceptor will register itself as a handeler for the operation, - so it can unregister the operation when it is complete - """ - self.operation_map[operation.uuid] = operation - operation.handlers.append(self) - - def on_operation_complete(self, op): - if op.uuid in self.operation_map: - del self.operation_map[op.uuid] - - def on_operation_cancelled(self, op): - self.on_operation_complete(op) - @CrossSync.convert async def intercept_unary_unary(self, continuation, client_call_details, request): try: diff --git a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py index bda4211b1..46dcd088f 100644 --- a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py @@ -26,30 +26,6 @@ class BigtableMetricsInterceptor( An async gRPC interceptor to add client metadata and print server metadata. """ - def __init__(self): - super().__init__() - self.operation_map = {} - - def register_operation(self, operation): - """Register an operation object to be tracked my the interceptor - - When registered, the operation will receive metadata updates: - - start_attempt if attempt not started when rpc is being sent - - add_response_metadata after call is complete - - end_attempt_with_status if attempt receives an error - - The interceptor will register itself as a handeler for the operation, - so it can unregister the operation when it is complete""" - self.operation_map[operation.uuid] = operation - operation.handlers.append(self) - - def on_operation_complete(self, op): - if op.uuid in self.operation_map: - del self.operation_map[op.uuid] - - def on_operation_cancelled(self, op): - self.on_operation_complete(op) - def intercept_unary_unary(self, continuation, client_call_details, request): try: call = continuation(client_call_details, request) From 557b54a1db5cf15cae86116955ec336066dc3be5 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 9 Sep 2025 21:00:14 +0000 Subject: [PATCH 03/12] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- tests/unit/data/_async/test_metrics_interceptor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/data/_async/test_metrics_interceptor.py b/tests/unit/data/_async/test_metrics_interceptor.py index 1c883eda5..99d6d5a5e 100644 --- a/tests/unit/data/_async/test_metrics_interceptor.py +++ b/tests/unit/data/_async/test_metrics_interceptor.py @@ -172,4 +172,4 @@ async def test_unary_stream_interceptor_failure_start_stream_generic(self): with pytest.raises(ValueError) as e: await instance.intercept_unary_stream(continuation, details, request) assert e.value == exc - continuation.assert_called_once_with(details, request) \ No newline at end of file + continuation.assert_called_once_with(details, request) From 3306ad735a9eefae35eb190ecf85879987d31d1a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 9 Sep 2025 14:24:23 -0700 Subject: [PATCH 04/12] fixed missing convert --- google/cloud/bigtable/data/_async/client.py | 1 + google/cloud/bigtable/data/_sync_autogen/client.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 7cf7e8625..2772198ca 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -418,6 +418,7 @@ def _invalidate_channel_stubs(self): self.transport._stubs = {} self.transport._prep_wrapped_messages(self.client_info) + @CrossSync.convert async def _manage_channel( self, refresh_interval_min: float = 60 * 35, diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 645a5fe08..56823d693 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -316,7 +316,7 @@ def _invalidate_channel_stubs(self): self.transport._stubs = {} self.transport._prep_wrapped_messages(self.client_info) - async def _manage_channel( + def _manage_channel( self, refresh_interval_min: float = 60 * 35, refresh_interval_max: float = 60 * 45, @@ -347,16 +347,16 @@ async def _manage_channel( ) next_sleep = max(first_refresh - time.monotonic(), 0) if next_sleep > 0: - await self._ping_and_warm_instances(channel=super_channel) + self._ping_and_warm_instances(channel=super_channel) while not self._is_closed.is_set(): - await CrossSync._Sync_Impl.event_wait( + CrossSync._Sync_Impl.event_wait( self._is_closed, next_sleep, async_break_early=False ) if self._is_closed.is_set(): break start_timestamp = time.monotonic() new_channel = super_channel.create_channel() - await self._ping_and_warm_instances(channel=new_channel) + self._ping_and_warm_instances(channel=new_channel) old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() if grace_period: From 65f15de1ae4e7712b53c6bf23220e541c19e4d08 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 9 Sep 2025 14:24:49 -0700 Subject: [PATCH 05/12] updated system test --- tests/system/data/test_system_async.py | 19 ++++++++----- tests/system/data/test_system_autogen.py | 34 +++++++++++++----------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index c96570b76..39c454996 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -285,23 +285,28 @@ async def test_channel_refresh(self, table_id, instance_id, temp_rows): async with client.get_table(instance_id, table_id) as table: rows = await table.read_rows({}) channel_wrapper = client.transport.grpc_channel - first_channel = client.transport.grpc_channel._channel + first_channel = channel_wrapper._channel assert len(rows) == 2 await CrossSync.sleep(2) rows_after_refresh = await table.read_rows({}) assert len(rows_after_refresh) == 2 assert client.transport.grpc_channel is channel_wrapper - assert client.transport.grpc_channel._channel is not first_channel - # ensure gapic's logging interceptor is still active + updated_channel = channel_wrapper._channel + assert updated_channel is not first_channel + # ensure interceptors are kept (gapic's logging interceptor, and metric interceptor) if CrossSync.is_async: - interceptors = ( - client.transport.grpc_channel._channel._unary_unary_interceptors - ) - assert GapicInterceptor in [type(i) for i in interceptors] + unary_interceptors = updated_channel._unary_unary_interceptors + assert len(unary_interceptors) == 2 + assert GapicInterceptor in [type(i) for i in unary_interceptors] + assert client._metrics_interceptor in unary_interceptors + stream_interceptors = updated_channel._unary_stream_interceptors + assert len(stream_interceptors) == 1 + assert client._metrics_interceptor in stream_interceptors else: assert isinstance( client.transport._logged_channel._interceptor, GapicInterceptor ) + assert updated_channel._interceptor == client._metrics_interceptor finally: await client.close() diff --git a/tests/system/data/test_system_autogen.py b/tests/system/data/test_system_autogen.py index a78a8eb4c..37c00f2ae 100644 --- a/tests/system/data/test_system_autogen.py +++ b/tests/system/data/test_system_autogen.py @@ -237,16 +237,18 @@ def test_channel_refresh(self, table_id, instance_id, temp_rows): with client.get_table(instance_id, table_id) as table: rows = table.read_rows({}) channel_wrapper = client.transport.grpc_channel - first_channel = client.transport.grpc_channel._channel + first_channel = channel_wrapper._channel assert len(rows) == 2 CrossSync._Sync_Impl.sleep(2) rows_after_refresh = table.read_rows({}) assert len(rows_after_refresh) == 2 assert client.transport.grpc_channel is channel_wrapper - assert client.transport.grpc_channel._channel is not first_channel + updated_channel = channel_wrapper._channel + assert updated_channel is not first_channel assert isinstance( client.transport._logged_channel._interceptor, GapicInterceptor ) + assert updated_channel._interceptor == client._metrics_interceptor finally: client.close() @@ -258,7 +260,7 @@ def test_mutation_set_cell(self, target, temp_rows): """Ensure cells can be set properly""" row_key = b"bulk_mutate" new_value = uuid.uuid4().hex.encode() - row_key, mutation = self._create_row_and_mutation( + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, new_value=new_value ) target.mutate_row(row_key, mutation) @@ -312,7 +314,7 @@ def test_bulk_mutations_set_cell(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value = uuid.uuid4().hex.encode() - row_key, mutation = self._create_row_and_mutation( + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) @@ -347,11 +349,11 @@ def test_mutations_batcher_context_manager(self, client, target, temp_rows): """test batcher with context manager. Should flush on exit""" from google.cloud.bigtable.data.mutations import RowMutationEntry - new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] - row_key, mutation = self._create_row_and_mutation( + (new_value, new_value2) = [uuid.uuid4().hex.encode() for _ in range(2)] + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, new_value=new_value ) - row_key2, mutation2 = self._create_row_and_mutation( + (row_key2, mutation2) = self._create_row_and_mutation( target, temp_rows, new_value=new_value2 ) bulk_mutation = RowMutationEntry(row_key, [mutation]) @@ -372,7 +374,7 @@ def test_mutations_batcher_timer_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value = uuid.uuid4().hex.encode() - row_key, mutation = self._create_row_and_mutation( + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) @@ -394,12 +396,12 @@ def test_mutations_batcher_count_flush(self, client, target, temp_rows): """batch should flush after flush_limit_mutation_count mutations""" from google.cloud.bigtable.data.mutations import RowMutationEntry - new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] - row_key, mutation = self._create_row_and_mutation( + (new_value, new_value2) = [uuid.uuid4().hex.encode() for _ in range(2)] + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - row_key2, mutation2 = self._create_row_and_mutation( + (row_key2, mutation2) = self._create_row_and_mutation( target, temp_rows, new_value=new_value2 ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) @@ -426,12 +428,12 @@ def test_mutations_batcher_bytes_flush(self, client, target, temp_rows): """batch should flush after flush_limit_bytes bytes""" from google.cloud.bigtable.data.mutations import RowMutationEntry - new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] - row_key, mutation = self._create_row_and_mutation( + (new_value, new_value2) = [uuid.uuid4().hex.encode() for _ in range(2)] + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - row_key2, mutation2 = self._create_row_and_mutation( + (row_key2, mutation2) = self._create_row_and_mutation( target, temp_rows, new_value=new_value2 ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) @@ -457,11 +459,11 @@ def test_mutations_batcher_no_flush(self, client, target, temp_rows): new_value = uuid.uuid4().hex.encode() start_value = b"unchanged" - row_key, mutation = self._create_row_and_mutation( + (row_key, mutation) = self._create_row_and_mutation( target, temp_rows, start_value=start_value, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - row_key2, mutation2 = self._create_row_and_mutation( + (row_key2, mutation2) = self._create_row_and_mutation( target, temp_rows, start_value=start_value, new_value=new_value ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) From 16c5b6adccb8e3da0d59d9a51114b6bb9a45aa89 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 9 Sep 2025 14:30:50 -0700 Subject: [PATCH 06/12] fixed lint --- google/cloud/bigtable/data/_async/client.py | 2 ++ google/cloud/bigtable/data/_sync_autogen/client.py | 3 ++- tests/unit/data/_async/test_metrics_interceptor.py | 3 --- tests/unit/data/_sync_autogen/test_metrics_interceptor.py | 3 --- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 2772198ca..51622fc0b 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -19,6 +19,7 @@ cast, Any, AsyncIterable, + Callable, Optional, Set, Sequence, @@ -278,6 +279,7 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: Returns: a custom wrapped swappable channel """ + create_channel_fn: Callable[[], Any] if self._emulator_host is not None: # emulators use insecure channel create_channel_fn = partial(insecure_channel, self._emulator_host) diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 56823d693..16d906167 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -17,7 +17,7 @@ # This file is automatically generated by CrossSync. Do not edit manually. from __future__ import annotations -from typing import cast, Any, Optional, Set, Sequence, TYPE_CHECKING +from typing import cast, Any, Callable, Optional, Set, Sequence, TYPE_CHECKING import abc import time import warnings @@ -206,6 +206,7 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: - **kwargs: keyword arguments passed by the gapic layer to create a new channel with Returns: a custom wrapped swappable channel""" + create_channel_fn: Callable[[], Any] if self._emulator_host is not None: create_channel_fn = partial(insecure_channel, self._emulator_host) else: diff --git a/tests/unit/data/_async/test_metrics_interceptor.py b/tests/unit/data/_async/test_metrics_interceptor.py index 99d6d5a5e..366194c7f 100644 --- a/tests/unit/data/_async/test_metrics_interceptor.py +++ b/tests/unit/data/_async/test_metrics_interceptor.py @@ -104,7 +104,6 @@ async def test_unary_unary_interceptor_failure_generic(self): instance = self._make_one() exc = ValueError("test") continuation = CrossSync.Mock(side_effect=exc) - call = continuation.return_value details = mock.Mock() request = mock.Mock() with pytest.raises(ValueError) as e: @@ -119,7 +118,6 @@ async def test_unary_stream_interceptor_success(self): instance = self._make_one() continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1, 2])) - call = continuation.return_value details = mock.Mock() request = mock.Mock() wrapper = await instance.intercept_unary_stream(continuation, details, request) @@ -133,7 +131,6 @@ async def test_unary_stream_interceptor_failure_mid_stream(self): instance = self._make_one() exc = ValueError("test") continuation = CrossSync.Mock(return_value=_make_mock_stream_call([1], exc=exc)) - call = continuation.return_value details = mock.Mock() request = mock.Mock() wrapper = await instance.intercept_unary_stream(continuation, details, request) diff --git a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py index 596d3306e..cf2c8f8da 100644 --- a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py +++ b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py @@ -82,7 +82,6 @@ def test_unary_unary_interceptor_failure_generic(self): instance = self._make_one() exc = ValueError("test") continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) - call = continuation.return_value details = mock.Mock() request = mock.Mock() with pytest.raises(ValueError) as e: @@ -96,7 +95,6 @@ def test_unary_stream_interceptor_success(self): continuation = CrossSync._Sync_Impl.Mock( return_value=_make_mock_stream_call([1, 2]) ) - call = continuation.return_value details = mock.Mock() request = mock.Mock() wrapper = instance.intercept_unary_stream(continuation, details, request) @@ -111,7 +109,6 @@ def test_unary_stream_interceptor_failure_mid_stream(self): continuation = CrossSync._Sync_Impl.Mock( return_value=_make_mock_stream_call([1], exc=exc) ) - call = continuation.return_value details = mock.Mock() request = mock.Mock() wrapper = instance.intercept_unary_stream(continuation, details, request) From 00cc52fce7ba04e011c6b098dcbddd02ebe6dea5 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 9 Sep 2025 14:35:24 -0700 Subject: [PATCH 07/12] fixed annotation --- google/cloud/bigtable/data/_async/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 51622fc0b..cf3e1860b 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -279,7 +279,7 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: Returns: a custom wrapped swappable channel """ - create_channel_fn: Callable[[], Any] + create_channel_fn: Callable[[], Channel] if self._emulator_host is not None: # emulators use insecure channel create_channel_fn = partial(insecure_channel, self._emulator_host) From bb00b8b9cf392ffec4bc04326fcd0cd026c2e8fd Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 15 Sep 2025 10:59:23 -0700 Subject: [PATCH 08/12] re-generated sync classes; removed test --- google/cloud/bigtable/data/_sync_autogen/client.py | 2 +- tests/unit/data/_async/test_metrics_interceptor.py | 4 ---- tests/unit/data/_sync_autogen/test_metrics_interceptor.py | 4 ---- 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 16d906167..3d718f966 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -206,7 +206,7 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: - **kwargs: keyword arguments passed by the gapic layer to create a new channel with Returns: a custom wrapped swappable channel""" - create_channel_fn: Callable[[], Any] + create_channel_fn: Callable[[], Channel] if self._emulator_host is not None: create_channel_fn = partial(insecure_channel, self._emulator_host) else: diff --git a/tests/unit/data/_async/test_metrics_interceptor.py b/tests/unit/data/_async/test_metrics_interceptor.py index 366194c7f..59fb06c47 100644 --- a/tests/unit/data/_async/test_metrics_interceptor.py +++ b/tests/unit/data/_async/test_metrics_interceptor.py @@ -67,10 +67,6 @@ def _get_target_class(): def _make_one(self, *args, **kwargs): return self._get_target_class()(*args, **kwargs) - def test_ctor(self): - instance = self._make_one() - assert instance.operation_map == {} - @CrossSync.pytest async def test_unary_unary_interceptor_success(self): """Test that interceptor handles successful unary-unary calls""" diff --git a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py index cf2c8f8da..31430ad84 100644 --- a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py +++ b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py @@ -50,10 +50,6 @@ def _get_target_class(): def _make_one(self, *args, **kwargs): return self._get_target_class()(*args, **kwargs) - def test_ctor(self): - instance = self._make_one() - assert instance.operation_map == {} - def test_unary_unary_interceptor_success(self): """Test that interceptor handles successful unary-unary calls""" instance = self._make_one() From c89f6d4b0fda1305910a5590f7eba9b7de0f0864 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 15 Sep 2025 14:20:51 -0700 Subject: [PATCH 09/12] loosen test --- tests/unit/data/test_sync_up_to_date.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/data/test_sync_up_to_date.py b/tests/unit/data/test_sync_up_to_date.py index d4623a6c8..e6bce9cf6 100644 --- a/tests/unit/data/test_sync_up_to_date.py +++ b/tests/unit/data/test_sync_up_to_date.py @@ -90,7 +90,7 @@ def test_verify_headers(sync_file): \#\ distributed\ under\ the\ License\ is\ distributed\ on\ an\ \"AS\ IS\"\ BASIS,\n \#\ WITHOUT\ WARRANTIES\ OR\ CONDITIONS\ OF\ ANY\ KIND,\ either\ express\ or\ implied\.\n \#\ See\ the\ License\ for\ the\ specific\ language\ governing\ permissions\ and\n - \#\ limitations\ under\ the\ License\. + \#\ limitations\ under\ the\ License """ pattern = re.compile(license_regex, re.VERBOSE) From 144d75e08ec0622ffb45500220cb1ab3bc0eb606 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 29 Sep 2025 23:07:14 -0700 Subject: [PATCH 10/12] broke out streaming wrapper into static function --- .../data/_async/metrics_interceptor.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigtable/data/_async/metrics_interceptor.py b/google/cloud/bigtable/data/_async/metrics_interceptor.py index 10bed46ca..a60562d0b 100644 --- a/google/cloud/bigtable/data/_async/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_async/metrics_interceptor.py @@ -36,6 +36,12 @@ class AsyncBigtableMetricsInterceptor( @CrossSync.convert async def intercept_unary_unary(self, continuation, client_call_details, request): + """ + Interceptor for unary rpcs: + - MutateRow + - CheckAndMutateRow + - ReadModifyWriteRow + """ try: call = await continuation(client_call_details, request) return call @@ -44,16 +50,29 @@ async def intercept_unary_unary(self, continuation, client_call_details, request @CrossSync.convert async def intercept_unary_stream(self, continuation, client_call_details, request): - async def response_wrapper(call): - try: - async for response in call: - yield response - except Exception as e: - # handle errors while processing stream - raise e - + """ + Interceptor for streaming rpcs: + - ReadRows + - MutateRows + - SampleRowKeys + """ try: - return response_wrapper(await continuation(client_call_details, request)) + return self._streaming_generator_wrapper( + await continuation(client_call_details, request) + ) except Exception as rpc_error: # handle errors while intializing stream raise rpc_error + + @staticmethod + @CrossSync.convert + async def _streaming_generator_wrapper(call): + """ + Wrapped generator to be returned by intercept_unary_stream + """ + try: + async for response in call: + yield response + except Exception as e: + # handle errors while processing stream + raise From c433f3c66199d4c4035fbc0978f14ed46197a202 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 29 Sep 2025 23:42:11 -0700 Subject: [PATCH 11/12] fixed lint --- .../data/_async/metrics_interceptor.py | 2 +- .../data/_sync_autogen/metrics_interceptor.py | 28 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigtable/data/_async/metrics_interceptor.py b/google/cloud/bigtable/data/_async/metrics_interceptor.py index a60562d0b..89bc6df5a 100644 --- a/google/cloud/bigtable/data/_async/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_async/metrics_interceptor.py @@ -75,4 +75,4 @@ async def _streaming_generator_wrapper(call): yield response except Exception as e: # handle errors while processing stream - raise + raise e diff --git a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py index 46dcd088f..1c71c6b31 100644 --- a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py @@ -27,6 +27,10 @@ class BigtableMetricsInterceptor( """ def intercept_unary_unary(self, continuation, client_call_details, request): + """Interceptor for unary rpcs: + - MutateRow + - CheckAndMutateRow + - ReadModifyWriteRow""" try: call = continuation(client_call_details, request) return call @@ -34,14 +38,22 @@ def intercept_unary_unary(self, continuation, client_call_details, request): raise rpc_error def intercept_unary_stream(self, continuation, client_call_details, request): - def response_wrapper(call): - try: - for response in call: - yield response - except Exception as e: - raise e - + """Interceptor for streaming rpcs: + - ReadRows + - MutateRows + - SampleRowKeys""" try: - return response_wrapper(continuation(client_call_details, request)) + return self._streaming_generator_wrapper( + continuation(client_call_details, request) + ) except Exception as rpc_error: raise rpc_error + + @staticmethod + def _streaming_generator_wrapper(call): + """Wrapped generator to be returned by intercept_unary_stream""" + try: + for response in call: + yield response + except Exception as e: + raise e From 6788df3b404093ace182623d65d219d9d34871cf Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 10 Oct 2025 13:52:14 -0700 Subject: [PATCH 12/12] address PR comments --- google/cloud/bigtable/data/_async/client.py | 18 +++++++++++------- .../data/_async/metrics_interceptor.py | 4 ++-- .../bigtable/data/_sync_autogen/client.py | 7 ++++--- .../data/_sync_autogen/metrics_interceptor.py | 4 ++-- .../data/_async/test_metrics_interceptor.py | 4 ++-- .../_sync_autogen/test_metrics_interceptor.py | 4 ++-- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index cf3e1860b..02220fcfd 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -101,7 +101,7 @@ AsyncSwappableChannel as SwappableChannelType, ) from google.cloud.bigtable.data._async.metrics_interceptor import ( - AsyncBigtableMetricsInterceptor as MetricInterceptorType, + AsyncBigtableMetricsInterceptor as MetricsInterceptorType, ) else: from typing import Iterable # noqa: F401 @@ -114,7 +114,7 @@ SwappableChannel as SwappableChannelType, ) from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( # noqa: F401 - BigtableMetricsInterceptor as MetricInterceptorType, + BigtableMetricsInterceptor as MetricsInterceptorType, ) if TYPE_CHECKING: @@ -210,7 +210,7 @@ def __init__( credentials = google.auth.credentials.AnonymousCredentials() if project is None: project = _DEFAULT_BIGTABLE_EMULATOR_CLIENT - self._metrics_interceptor = MetricInterceptorType() + self._metrics_interceptor = MetricsInterceptorType() # initialize client ClientWithProject.__init__( self, @@ -281,21 +281,25 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: """ create_channel_fn: Callable[[], Channel] if self._emulator_host is not None: - # emulators use insecure channel + # Emulators use insecure channels create_channel_fn = partial(insecure_channel, self._emulator_host) elif CrossSync.is_async: + # For async client, use the default create_channel. create_channel_fn = partial(TransportType.create_channel, *args, **kwargs) else: - # attach sync interceptors in create_channel_fn - def create_channel_fn(): + # For sync client, wrap create_channel with interceptors. + def sync_create_channel_fn(): return intercept_channel( TransportType.create_channel(*args, **kwargs), self._metrics_interceptor, ) + create_channel_fn = sync_create_channel_fn + + # Instantiate SwappableChannelType with the determined creation function. new_channel = SwappableChannelType(create_channel_fn) if CrossSync.is_async: - # attach async interceptors + # Attach async interceptors to the channel instance itself. new_channel._unary_unary_interceptors.append(self._metrics_interceptor) new_channel._unary_stream_interceptors.append(self._metrics_interceptor) return new_channel diff --git a/google/cloud/bigtable/data/_async/metrics_interceptor.py b/google/cloud/bigtable/data/_async/metrics_interceptor.py index 89bc6df5a..a154c0083 100644 --- a/google/cloud/bigtable/data/_async/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_async/metrics_interceptor.py @@ -10,7 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License +# limitations under the License. from __future__ import annotations from google.cloud.bigtable.data._cross_sync import CrossSync @@ -68,7 +68,7 @@ async def intercept_unary_stream(self, continuation, client_call_details, reques @CrossSync.convert async def _streaming_generator_wrapper(call): """ - Wrapped generator to be returned by intercept_unary_stream + Wrapped generator to be returned by intercept_unary_stream. """ try: async for response in call: diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 3d718f966..e73d6e94c 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -85,7 +85,7 @@ SwappableChannel as SwappableChannelType, ) from google.cloud.bigtable.data._sync_autogen.metrics_interceptor import ( - BigtableMetricsInterceptor as MetricInterceptorType, + BigtableMetricsInterceptor as MetricsInterceptorType, ) if TYPE_CHECKING: @@ -149,7 +149,7 @@ def __init__( credentials = google.auth.credentials.AnonymousCredentials() if project is None: project = _DEFAULT_BIGTABLE_EMULATOR_CLIENT - self._metrics_interceptor = MetricInterceptorType() + self._metrics_interceptor = MetricsInterceptorType() ClientWithProject.__init__( self, credentials=credentials, @@ -211,12 +211,13 @@ def _build_grpc_channel(self, *args, **kwargs) -> SwappableChannelType: create_channel_fn = partial(insecure_channel, self._emulator_host) else: - def create_channel_fn(): + def sync_create_channel_fn(): return intercept_channel( TransportType.create_channel(*args, **kwargs), self._metrics_interceptor, ) + create_channel_fn = sync_create_channel_fn new_channel = SwappableChannelType(create_channel_fn) return new_channel diff --git a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py index 1c71c6b31..9e47313b0 100644 --- a/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py +++ b/google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py @@ -10,7 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License +# limitations under the License. # This file is automatically generated by CrossSync. Do not edit manually. @@ -51,7 +51,7 @@ def intercept_unary_stream(self, continuation, client_call_details, request): @staticmethod def _streaming_generator_wrapper(call): - """Wrapped generator to be returned by intercept_unary_stream""" + """Wrapped generator to be returned by intercept_unary_stream.""" try: for response in call: yield response diff --git a/tests/unit/data/_async/test_metrics_interceptor.py b/tests/unit/data/_async/test_metrics_interceptor.py index 59fb06c47..6ea958358 100644 --- a/tests/unit/data/_async/test_metrics_interceptor.py +++ b/tests/unit/data/_async/test_metrics_interceptor.py @@ -81,7 +81,7 @@ async def test_unary_unary_interceptor_success(self): @CrossSync.pytest async def test_unary_unary_interceptor_failure(self): - """test a failed RpcError with metadata""" + """Test a failed RpcError with metadata""" instance = self._make_one() exc = RpcError("test") @@ -95,7 +95,7 @@ async def test_unary_unary_interceptor_failure(self): @CrossSync.pytest async def test_unary_unary_interceptor_failure_generic(self): - """test generic exception""" + """Test generic exception""" instance = self._make_one() exc = ValueError("test") diff --git a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py index 31430ad84..56a6f3650 100644 --- a/tests/unit/data/_sync_autogen/test_metrics_interceptor.py +++ b/tests/unit/data/_sync_autogen/test_metrics_interceptor.py @@ -62,7 +62,7 @@ def test_unary_unary_interceptor_success(self): continuation.assert_called_once_with(details, request) def test_unary_unary_interceptor_failure(self): - """test a failed RpcError with metadata""" + """Test a failed RpcError with metadata""" instance = self._make_one() exc = RpcError("test") continuation = CrossSync._Sync_Impl.Mock(side_effect=exc) @@ -74,7 +74,7 @@ def test_unary_unary_interceptor_failure(self): continuation.assert_called_once_with(details, request) def test_unary_unary_interceptor_failure_generic(self): - """test generic exception""" + """Test generic exception""" instance = self._make_one() exc = ValueError("test") continuation = CrossSync._Sync_Impl.Mock(side_effect=exc)