@@ -93,7 +93,7 @@ def _restart_on_unavailable(
9393 item_buffer : List [PartialResultSet ] = []
9494
9595 if transaction is not None :
96- transaction_selector = transaction ._make_txn_selector ()
96+ transaction_selector = transaction ._build_transaction_selector_pb ()
9797 elif transaction_selector is None :
9898 raise InvalidArgument (
9999 "Either transaction or transaction_selector should be set"
@@ -149,7 +149,7 @@ def _restart_on_unavailable(
149149 ) as span , MetricsCapture ():
150150 request .resume_token = resume_token
151151 if transaction is not None :
152- transaction_selector = transaction ._make_txn_selector ()
152+ transaction_selector = transaction ._build_transaction_selector_pb ()
153153 request .transaction = transaction_selector
154154 attempt += 1
155155 iterator = method (
@@ -180,7 +180,7 @@ def _restart_on_unavailable(
180180 ) as span , MetricsCapture ():
181181 request .resume_token = resume_token
182182 if transaction is not None :
183- transaction_selector = transaction ._make_txn_selector ()
183+ transaction_selector = transaction ._build_transaction_selector_pb ()
184184 attempt += 1
185185 request .transaction = transaction_selector
186186 iterator = method (
@@ -238,17 +238,6 @@ def __init__(self, session):
238238 # threads, so we need to use a lock when updating the transaction.
239239 self ._lock : threading .Lock = threading .Lock ()
240240
241- def _make_txn_selector (self ):
242- """Helper for :meth:`read` / :meth:`execute_sql`.
243-
244- Subclasses must override, returning an instance of
245- :class:`transaction_pb2.TransactionSelector`
246- appropriate for making ``read`` / ``execute_sql`` requests
247-
248- :raises: NotImplementedError, always
249- """
250- raise NotImplementedError
251-
252241 def begin (self ) -> bytes :
253242 """Begins a transaction on the database.
254243
@@ -732,7 +721,7 @@ def partition_read(
732721 metadata .append (
733722 _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
734723 )
735- transaction = self ._make_txn_selector ()
724+ transaction = self ._build_transaction_selector_pb ()
736725 partition_options = PartitionOptions (
737726 partition_size_bytes = partition_size_bytes , max_partitions = max_partitions
738727 )
@@ -854,7 +843,7 @@ def partition_query(
854843 metadata .append (
855844 _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
856845 )
857- transaction = self ._make_txn_selector ()
846+ transaction = self ._build_transaction_selector_pb ()
858847 partition_options = PartitionOptions (
859848 partition_size_bytes = partition_size_bytes , max_partitions = max_partitions
860849 )
@@ -944,7 +933,7 @@ def _begin_transaction(self, mutation: Mutation = None) -> bytes:
944933 def wrapped_method ():
945934 begin_transaction_request = BeginTransactionRequest (
946935 session = session .name ,
947- options = self ._make_txn_selector ().begin ,
936+ options = self ._build_transaction_selector_pb ().begin ,
948937 mutation_key = mutation ,
949938 )
950939 begin_transaction_method = functools .partial (
@@ -983,6 +972,34 @@ def before_next_retry(nth_retry, delay_in_seconds):
983972 self ._update_for_transaction_pb (transaction_pb )
984973 return self ._transaction_id
985974
975+ def _build_transaction_options_pb (self ) -> TransactionOptions :
976+ """Builds and returns the transaction options for this snapshot.
977+
978+ :rtype: :class:`transaction_pb2.TransactionOptions`
979+ :returns: the transaction options for this snapshot.
980+ """
981+ raise NotImplementedError
982+
983+ def _build_transaction_selector_pb (self ) -> TransactionSelector :
984+ """Builds and returns a transaction selector for this snapshot.
985+
986+ :rtype: :class:`transaction_pb2.TransactionSelector`
987+ :returns: a transaction selector for this snapshot.
988+ """
989+
990+ # Select a previously begun transaction.
991+ if self ._transaction_id is not None :
992+ return TransactionSelector (id = self ._transaction_id )
993+
994+ options = self ._build_transaction_options_pb ()
995+
996+ # Select a single-use transaction.
997+ if not self ._multi_use :
998+ return TransactionSelector (single_use = options )
999+
1000+ # Select a new, multi-use transaction.
1001+ return TransactionSelector (begin = options )
1002+
9861003 def _update_for_result_set_pb (
9871004 self , result_set_pb : Union [ResultSet , PartialResultSet ]
9881005 ) -> None :
@@ -1101,38 +1118,28 @@ def __init__(
11011118 self ._multi_use = multi_use
11021119 self ._transaction_id = transaction_id
11031120
1104- # TODO multiplexed - refactor to base class
1105- def _make_txn_selector (self ):
1106- """Helper for :meth:`read`."""
1107- if self ._transaction_id is not None :
1108- return TransactionSelector (id = self ._transaction_id )
1121+ def _build_transaction_options_pb (self ) -> TransactionOptions :
1122+ """Builds and returns transaction options for this snapshot.
1123+
1124+ :rtype: :class:`transaction_pb2.TransactionOptions`
1125+ :returns: transaction options for this snapshot.
1126+ """
1127+
1128+ read_only_pb_args = dict (return_read_timestamp = True )
11091129
11101130 if self ._read_timestamp :
1111- key = "read_timestamp"
1112- value = self ._read_timestamp
1131+ read_only_pb_args ["read_timestamp" ] = self ._read_timestamp
11131132 elif self ._min_read_timestamp :
1114- key = "min_read_timestamp"
1115- value = self ._min_read_timestamp
1133+ read_only_pb_args ["min_read_timestamp" ] = self ._min_read_timestamp
11161134 elif self ._max_staleness :
1117- key = "max_staleness"
1118- value = self ._max_staleness
1135+ read_only_pb_args ["max_staleness" ] = self ._max_staleness
11191136 elif self ._exact_staleness :
1120- key = "exact_staleness"
1121- value = self ._exact_staleness
1137+ read_only_pb_args ["exact_staleness" ] = self ._exact_staleness
11221138 else :
1123- key = "strong"
1124- value = True
1125-
1126- options = TransactionOptions (
1127- read_only = TransactionOptions .ReadOnly (
1128- ** {key : value , "return_read_timestamp" : True }
1129- )
1130- )
1139+ read_only_pb_args ["strong" ] = True
11311140
1132- if self ._multi_use :
1133- return TransactionSelector (begin = options )
1134- else :
1135- return TransactionSelector (single_use = options )
1141+ read_only_pb = TransactionOptions .ReadOnly (** read_only_pb_args )
1142+ return TransactionOptions (read_only = read_only_pb )
11361143
11371144 def _update_for_transaction_pb (self , transaction_pb : Transaction ) -> None :
11381145 """Updates the snapshot for the given transaction.
0 commit comments