2727from google .protobuf .internal .enum_type_wrapper import EnumTypeWrapper
2828
2929from google .api_core import datetime_helpers
30+ from google .api_core .exceptions import Aborted
3031from google .cloud ._helpers import _date_from_iso8601_date
3132from google .cloud .spanner_v1 import TypeCode
3233from google .cloud .spanner_v1 import ExecuteSqlRequest
3334from google .cloud .spanner_v1 import JsonObject
3435from google .cloud .spanner_v1 .request_id_header import with_request_id
36+ from google .rpc .error_details_pb2 import RetryInfo
37+
38+ import random
3539
3640# Validation error messages
3741NUMERIC_MAX_SCALE_ERR_MSG = (
@@ -466,13 +470,19 @@ def _retry(
466470 delay = 2 ,
467471 allowed_exceptions = None ,
468472 beforeNextRetry = None ,
473+ deadline = None ,
469474):
470475 """
471- Retry a function with a specified number of retries, delay between retries, and list of allowed exceptions.
476+ Retry a specified function with different logic based on the type of exception raised.
477+
478+ If the exception is of type google.api_core.exceptions.Aborted,
479+ apply an alternate retry strategy that relies on the provided deadline value instead of a fixed number of retries.
480+ For all other exceptions, retry the function up to a specified number of times.
472481
473482 Args:
474483 func: The function to be retried.
475484 retry_count: The maximum number of times to retry the function.
485+ deadline: This will be used in case of Aborted transactions.
476486 delay: The delay in seconds between retries.
477487 allowed_exceptions: A tuple of exceptions that are allowed to occur without triggering a retry.
478488 Passing allowed_exceptions as None will lead to retrying for all exceptions.
@@ -481,13 +491,21 @@ def _retry(
481491 The result of the function if it is successful, or raises the last exception if all retries fail.
482492 """
483493 retries = 0
484- while retries <= retry_count :
494+ while True :
485495 if retries > 0 and beforeNextRetry :
486496 beforeNextRetry (retries , delay )
487497
488498 try :
489499 return func ()
490500 except Exception as exc :
501+ if isinstance (exc , Aborted ) and deadline is not None :
502+ if (
503+ allowed_exceptions is not None
504+ and allowed_exceptions .get (exc .__class__ ) is not None
505+ ):
506+ retries += 1
507+ _delay_until_retry (exc , deadline = deadline , attempts = retries )
508+ continue
491509 if (
492510 allowed_exceptions is None or exc .__class__ in allowed_exceptions
493511 ) and retries < retry_count :
@@ -529,6 +547,60 @@ def _metadata_with_leader_aware_routing(value, **kw):
529547 return ("x-goog-spanner-route-to-leader" , str (value ).lower ())
530548
531549
550+ def _delay_until_retry (exc , deadline , attempts ):
551+ """Helper for :meth:`Session.run_in_transaction`.
552+
553+ Detect retryable abort, and impose server-supplied delay.
554+
555+ :type exc: :class:`google.api_core.exceptions.Aborted`
556+ :param exc: exception for aborted transaction
557+
558+ :type deadline: float
559+ :param deadline: maximum timestamp to continue retrying the transaction.
560+
561+ :type attempts: int
562+ :param attempts: number of call retries
563+ """
564+
565+ cause = exc .errors [0 ]
566+ now = time .time ()
567+ if now >= deadline :
568+ raise
569+
570+ delay = _get_retry_delay (cause , attempts )
571+ if delay is not None :
572+ if now + delay > deadline :
573+ raise
574+
575+ time .sleep (delay )
576+
577+
578+ def _get_retry_delay (cause , attempts ):
579+ """Helper for :func:`_delay_until_retry`.
580+
581+ :type exc: :class:`grpc.Call`
582+ :param exc: exception for aborted transaction
583+
584+ :rtype: float
585+ :returns: seconds to wait before retrying the transaction.
586+
587+ :type attempts: int
588+ :param attempts: number of call retries
589+ """
590+ if hasattr (cause , "trailing_metadata" ):
591+ metadata = dict (cause .trailing_metadata ())
592+ else :
593+ metadata = {}
594+ retry_info_pb = metadata .get ("google.rpc.retryinfo-bin" )
595+ if retry_info_pb is not None :
596+ retry_info = RetryInfo ()
597+ retry_info .ParseFromString (retry_info_pb )
598+ nanos = retry_info .retry_delay .nanos
599+ return retry_info .retry_delay .seconds + nanos / 1.0e9
600+
601+ return 2 ** attempts + random .random ()
602+
603+
532604class AtomicCounter :
533605 def __init__ (self , start_value = 0 ):
534606 self .__lock = threading .Lock ()
0 commit comments