6262from google .cloud .spanner_v1 .pool import BurstyPool
6363from google .cloud .spanner_v1 .pool import SessionCheckout
6464from google .cloud .spanner_v1 .session import Session
65+ from google .cloud .spanner_v1 .session_options import SessionOptions
66+ from google .cloud .spanner_v1 .database_sessions_manager import DatabaseSessionsManager
6567from google .cloud .spanner_v1 .snapshot import _restart_on_unavailable
6668from google .cloud .spanner_v1 .snapshot import Snapshot
6769from google .cloud .spanner_v1 .streamed import StreamedResultSet
@@ -199,6 +201,10 @@ def __init__(
199201
200202 self ._pool = pool
201203 pool .bind (self )
204+
205+ # Initialize session options and sessions manager for multiplexed session support
206+ self .session_options = SessionOptions ()
207+ self ._sessions_manager = DatabaseSessionsManager (self , pool )
202208
203209 @classmethod
204210 def from_pb (cls , database_pb , instance , pool = None ):
@@ -759,7 +765,11 @@ def execute_pdml():
759765 "CloudSpanner.Database.execute_partitioned_pdml" ,
760766 observability_options = self .observability_options ,
761767 ) as span , MetricsCapture ():
762- with SessionCheckout (self ._pool ) as session :
768+ from google .cloud .spanner_v1 .session_options import TransactionType
769+
770+ # Use sessions manager for partitioned DML operations
771+ session = self ._sessions_manager .get_session (TransactionType .PARTITIONED )
772+ try :
763773 add_span_event (span , "Starting BeginTransaction" )
764774 txn = api .begin_transaction (
765775 session = session .name ,
@@ -802,6 +812,8 @@ def execute_pdml():
802812 list (result_set ) # consume all partials
803813
804814 return result_set .stats .row_count_lower_bound
815+ finally :
816+ self ._sessions_manager .put_session (session )
805817
806818 return _retry_on_aborted (execute_pdml , DEFAULT_RETRY_BACKOFF )()
807819
@@ -1240,6 +1252,15 @@ def observability_options(self):
12401252 opts ["db_name" ] = self .name
12411253 return opts
12421254
1255+ @property
1256+ def sessions_manager (self ):
1257+ """Returns the database sessions manager.
1258+
1259+ :rtype: :class:`~google.cloud.spanner_v1.database_sessions_manager.DatabaseSessionsManager`
1260+ :returns: The sessions manager for this database.
1261+ """
1262+ return self ._sessions_manager
1263+
12431264
12441265class BatchCheckout (object ):
12451266 """Context manager for using a batch from a database.
@@ -1290,8 +1311,12 @@ def __init__(
12901311
12911312 def __enter__ (self ):
12921313 """Begin ``with`` block."""
1314+ from google .cloud .spanner_v1 .session_options import TransactionType
1315+
12931316 current_span = get_current_span ()
1294- session = self ._session = self ._database ._pool .get ()
1317+ session = self ._session = self ._database .sessions_manager .get_session (
1318+ TransactionType .READ_WRITE
1319+ )
12951320 add_span_event (current_span , "Using session" , {"id" : session .session_id })
12961321 batch = self ._batch = Batch (session )
12971322 if self ._request_options .transaction_tag :
@@ -1316,7 +1341,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
13161341 "CommitStats: {}" .format (self ._batch .commit_stats ),
13171342 extra = {"commit_stats" : self ._batch .commit_stats },
13181343 )
1319- self ._database ._pool . put (self ._session )
1344+ self ._database .sessions_manager . put_session (self ._session )
13201345 current_span = get_current_span ()
13211346 add_span_event (
13221347 current_span ,
@@ -1344,7 +1369,11 @@ def __init__(self, database):
13441369
13451370 def __enter__ (self ):
13461371 """Begin ``with`` block."""
1347- session = self ._session = self ._database ._pool .get ()
1372+ from google .cloud .spanner_v1 .session_options import TransactionType
1373+
1374+ session = self ._session = self ._database .sessions_manager .get_session (
1375+ TransactionType .READ_WRITE
1376+ )
13481377 return MutationGroups (session )
13491378
13501379 def __exit__ (self , exc_type , exc_val , exc_tb ):
@@ -1355,7 +1384,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
13551384 if not self ._session .exists ():
13561385 self ._session = self ._database ._pool ._new_session ()
13571386 self ._session .create ()
1358- self ._database ._pool . put (self ._session )
1387+ self ._database .sessions_manager . put_session (self ._session )
13591388
13601389
13611390class SnapshotCheckout (object ):
@@ -1383,7 +1412,11 @@ def __init__(self, database, **kw):
13831412
13841413 def __enter__ (self ):
13851414 """Begin ``with`` block."""
1386- session = self ._session = self ._database ._pool .get ()
1415+ from google .cloud .spanner_v1 .session_options import TransactionType
1416+
1417+ session = self ._session = self ._database .sessions_manager .get_session (
1418+ TransactionType .READ_ONLY
1419+ )
13871420 return Snapshot (session , ** self ._kw )
13881421
13891422 def __exit__ (self , exc_type , exc_val , exc_tb ):
@@ -1394,7 +1427,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
13941427 if not self ._session .exists ():
13951428 self ._session = self ._database ._pool ._new_session ()
13961429 self ._session .create ()
1397- self ._database ._pool . put (self ._session )
1430+ self ._database .sessions_manager . put_session (self ._session )
13981431
13991432
14001433class BatchSnapshot (object ):
@@ -1474,10 +1507,13 @@ def _get_session(self):
14741507 all partitions have been processed.
14751508 """
14761509 if self ._session is None :
1477- session = self ._session = self ._database .session ()
1478- if self ._session_id is None :
1479- session .create ()
1480- else :
1510+ from google .cloud .spanner_v1 .session_options import TransactionType
1511+
1512+ # Use sessions manager for partition operations
1513+ session = self ._session = self ._database .sessions_manager .get_session (
1514+ TransactionType .PARTITIONED
1515+ )
1516+ if self ._session_id is not None :
14811517 session ._session_id = self ._session_id
14821518 return self ._session
14831519
0 commit comments