3333import random
3434import os
3535
36+ from functools import partial
3637
3738from google .cloud .bigtable_v2 .services .bigtable .client import BigtableClientMeta
3839from google .cloud .bigtable_v2 .services .bigtable .async_client import BigtableAsyncClient
4344)
4445from google .cloud .bigtable_v2 .types .bigtable import PingAndWarmRequest
4546from google .cloud .client import ClientWithProject
46- from google .api_core .exceptions import GoogleAPICallError
4747from google .cloud .environment_vars import BIGTABLE_EMULATOR # type: ignore
48- from google .api_core import retry_async as retries
48+ from google .api_core import retry as retries
4949from google .api_core .exceptions import DeadlineExceeded
5050from google .api_core .exceptions import ServiceUnavailable
5151from google .api_core .exceptions import Aborted
6565from google .cloud .bigtable .data ._helpers import _WarmedInstanceKey
6666from google .cloud .bigtable .data ._helpers import _CONCURRENCY_LIMIT
6767from google .cloud .bigtable .data ._helpers import _make_metadata
68- from google .cloud .bigtable .data ._helpers import _convert_retry_deadline
68+ from google .cloud .bigtable .data ._helpers import _retry_exception_factory
6969from google .cloud .bigtable .data ._helpers import _validate_timeouts
7070from google .cloud .bigtable .data ._helpers import _get_retryable_errors
7171from google .cloud .bigtable .data ._helpers import _get_timeouts
@@ -223,7 +223,7 @@ async def close(self, timeout: float = 2.0):
223223
224224 async def _ping_and_warm_instances (
225225 self , channel : grpc .aio .Channel , instance_key : _WarmedInstanceKey | None = None
226- ) -> list [GoogleAPICallError | None ]:
226+ ) -> list [BaseException | None ]:
227227 """
228228 Prepares the backend for requests on a channel
229229
@@ -578,7 +578,6 @@ async def read_rows_stream(
578578 will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
579579 from any retries that failed
580580 - GoogleAPIError: raised if the request encounters an unrecoverable error
581- - IdleTimeout: if iterator was abandoned
582581 """
583582 operation_timeout , attempt_timeout = _get_timeouts (
584583 operation_timeout , attempt_timeout , self
@@ -761,6 +760,9 @@ async def read_rows_sharded(
761760 for result in batch_result :
762761 if isinstance (result , Exception ):
763762 error_dict [shard_idx ] = result
763+ elif isinstance (result , BaseException ):
764+ # BaseException not expected; raise immediately
765+ raise result
764766 else :
765767 results_list .extend (result )
766768 shard_idx += 1
@@ -872,22 +874,8 @@ async def sample_row_keys(
872874 # prepare retryable
873875 retryable_excs = _get_retryable_errors (retryable_errors , self )
874876 predicate = retries .if_exception_type (* retryable_excs )
875- transient_errors = []
876877
877- def on_error_fn (exc ):
878- # add errors to list if retryable
879- if predicate (exc ):
880- transient_errors .append (exc )
881-
882- retry = retries .AsyncRetry (
883- predicate = predicate ,
884- timeout = operation_timeout ,
885- initial = 0.01 ,
886- multiplier = 2 ,
887- maximum = 60 ,
888- on_error = on_error_fn ,
889- is_stream = False ,
890- )
878+ sleep_generator = retries .exponential_sleep_generator (0.01 , 2 , 60 )
891879
892880 # prepare request
893881 metadata = _make_metadata (self .table_name , self .app_profile_id )
@@ -902,10 +890,13 @@ async def execute_rpc():
902890 )
903891 return [(s .row_key , s .offset_bytes ) async for s in results ]
904892
905- wrapped_fn = _convert_retry_deadline (
906- retry (execute_rpc ), operation_timeout , transient_errors , is_async = True
893+ return await retries .retry_target_async (
894+ execute_rpc ,
895+ predicate ,
896+ sleep_generator ,
897+ operation_timeout ,
898+ exception_factory = _retry_exception_factory ,
907899 )
908- return await wrapped_fn ()
909900
910901 def mutations_batcher (
911902 self ,
@@ -1014,37 +1005,25 @@ async def mutate_row(
10141005 # mutations should not be retried
10151006 predicate = retries .if_exception_type ()
10161007
1017- transient_errors = []
1018-
1019- def on_error_fn (exc ):
1020- if predicate (exc ):
1021- transient_errors .append (exc )
1008+ sleep_generator = retries .exponential_sleep_generator (0.01 , 2 , 60 )
10221009
1023- retry = retries .AsyncRetry (
1024- predicate = predicate ,
1025- on_error = on_error_fn ,
1026- timeout = operation_timeout ,
1027- initial = 0.01 ,
1028- multiplier = 2 ,
1029- maximum = 60 ,
1030- )
1031- # wrap rpc in retry logic
1032- retry_wrapped = retry (self .client ._gapic_client .mutate_row )
1033- # convert RetryErrors from retry wrapper into DeadlineExceeded errors
1034- deadline_wrapped = _convert_retry_deadline (
1035- retry_wrapped , operation_timeout , transient_errors , is_async = True
1036- )
1037- metadata = _make_metadata (self .table_name , self .app_profile_id )
1038- # trigger rpc
1039- await deadline_wrapped (
1010+ target = partial (
1011+ self .client ._gapic_client .mutate_row ,
10401012 row_key = row_key .encode ("utf-8" ) if isinstance (row_key , str ) else row_key ,
10411013 mutations = [mutation ._to_pb () for mutation in mutations_list ],
10421014 table_name = self .table_name ,
10431015 app_profile_id = self .app_profile_id ,
10441016 timeout = attempt_timeout ,
1045- metadata = metadata ,
1017+ metadata = _make_metadata ( self . table_name , self . app_profile_id ) ,
10461018 retry = None ,
10471019 )
1020+ return await retries .retry_target_async (
1021+ target ,
1022+ predicate ,
1023+ sleep_generator ,
1024+ operation_timeout ,
1025+ exception_factory = _retry_exception_factory ,
1026+ )
10481027
10491028 async def bulk_mutate_rows (
10501029 self ,
0 commit comments