diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index feb3b92756..efbeea05e7 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -15,6 +15,7 @@ """Manages OpenTelemetry trace creation and handling""" from contextlib import contextmanager +from datetime import datetime import os from google.cloud.spanner_v1 import SpannerClient @@ -56,6 +57,9 @@ def get_tracer(tracer_provider=None): @contextmanager def trace_call(name, session, extra_attributes=None, observability_options=None): + if session: + session._last_use_time = datetime.now() + if not HAS_OPENTELEMETRY_INSTALLED or not session: # Empty context manager. Users will have to check if the generated value is None or a span yield None diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 56837bfc0b..c95ef7a7b9 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -145,7 +145,8 @@ class FixedSizePool(AbstractSessionPool): - Pre-allocates / creates a fixed number of sessions. - "Pings" existing sessions via :meth:`session.exists` before returning - them, and replaces expired sessions. + sessions that have not been used for more than 55 minutes and replaces + expired sessions. - Blocks, with a timeout, when :meth:`get` is called on an empty pool. Raises after timing out. @@ -171,6 +172,7 @@ class FixedSizePool(AbstractSessionPool): DEFAULT_SIZE = 10 DEFAULT_TIMEOUT = 10 + DEFAULT_MAX_AGE_MINUTES = 55 def __init__( self, @@ -178,11 +180,13 @@ def __init__( default_timeout=DEFAULT_TIMEOUT, labels=None, database_role=None, + max_age_minutes=DEFAULT_MAX_AGE_MINUTES, ): super(FixedSizePool, self).__init__(labels=labels, database_role=database_role) self.size = size self.default_timeout = default_timeout self._sessions = queue.LifoQueue(size) + self._max_age = datetime.timedelta(minutes=max_age_minutes) def bind(self, database): """Associate the pool with a database. @@ -230,8 +234,9 @@ def get(self, timeout=None): timeout = self.default_timeout session = self._sessions.get(block=True, timeout=timeout) + age = _NOW() - session.last_use_time - if not session.exists(): + if age >= self._max_age and not session.exists(): session = self._database.session() session.create() diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 6281148590..539f36af2b 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -17,6 +17,7 @@ from functools import total_ordering import random import time +from datetime import datetime from google.api_core.exceptions import Aborted from google.api_core.exceptions import GoogleAPICallError @@ -69,6 +70,7 @@ def __init__(self, database, labels=None, database_role=None): labels = {} self._labels = labels self._database_role = database_role + self._last_use_time = datetime.utcnow() def __lt__(self, other): return self._session_id < other._session_id @@ -78,6 +80,14 @@ def session_id(self): """Read-only ID, set by the back-end during :meth:`create`.""" return self._session_id + @property + def last_use_time(self): + """ "Approximate last use time of this session + + :rtype: datetime + :returns: the approximate last use time of this session""" + return self._last_use_time + @property def database_role(self): """User-assigned database-role for the session. @@ -222,6 +232,7 @@ def ping(self): metadata = _metadata_with_prefix(self._database.name) request = ExecuteSqlRequest(session=self.name, sql="SELECT 1") api.execute_sql(request=request, metadata=metadata) + self._last_use_time = datetime.now() def snapshot(self, **kw): """Create a snapshot to perform a set of reads with shared staleness. diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 143e17c503..89b5094706 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -14,6 +14,7 @@ """Model a set of read-only queries to a database as a snapshot.""" +from datetime import datetime import functools import threading from google.protobuf.struct_pb2 import Struct @@ -364,6 +365,7 @@ def read( ) self._read_request_count += 1 + self._session._last_use_time = datetime.now() if self._multi_use: return StreamedResultSet( diff --git a/tests/mockserver_tests/test_basics.py b/tests/mockserver_tests/test_basics.py index f2dab9af06..12a224314f 100644 --- a/tests/mockserver_tests/test_basics.py +++ b/tests/mockserver_tests/test_basics.py @@ -29,7 +29,6 @@ FixedSizePool, BatchCreateSessionsRequest, ExecuteSqlRequest, - GetSessionRequest, ) from google.cloud.spanner_v1.database import Database from google.cloud.spanner_v1.instance import Instance @@ -125,12 +124,9 @@ def test_select1(self): self.assertEqual(1, row[0]) self.assertEqual(1, len(result_list)) requests = self.spanner_service.requests - self.assertEqual(3, len(requests)) + self.assertEqual(2, len(requests), msg=requests) self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) - # TODO: Optimize FixedSizePool so this GetSessionRequest is not executed - # every time a session is fetched. - self.assertTrue(isinstance(requests[1], GetSessionRequest)) - self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) def test_create_table(self): database_admin_api = self.client.database_admin_api diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 23ed3e7251..2e3b46fa73 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -15,6 +15,7 @@ from functools import total_ordering import unittest +from datetime import datetime, timedelta import mock @@ -184,13 +185,30 @@ def test_bind(self): for session in SESSIONS: session.create.assert_not_called() - def test_get_non_expired(self): + def test_get_active(self): pool = self._make_one(size=4) database = _Database("name") SESSIONS = sorted([_Session(database) for i in range(0, 4)]) database._sessions.extend(SESSIONS) pool.bind(database) + # check if sessions returned in LIFO order + for i in (3, 2, 1, 0): + session = pool.get() + self.assertIs(session, SESSIONS[i]) + self.assertFalse(session._exists_checked) + self.assertFalse(pool._sessions.full()) + + def test_get_non_expired(self): + pool = self._make_one(size=4) + database = _Database("name") + last_use_time = datetime.utcnow() - timedelta(minutes=56) + SESSIONS = sorted( + [_Session(database, last_use_time=last_use_time) for i in range(0, 4)] + ) + database._sessions.extend(SESSIONS) + pool.bind(database) + # check if sessions returned in LIFO order for i in (3, 2, 1, 0): session = pool.get() @@ -201,7 +219,8 @@ def test_get_non_expired(self): def test_get_expired(self): pool = self._make_one(size=4) database = _Database("name") - SESSIONS = [_Session(database)] * 5 + last_use_time = datetime.utcnow() - timedelta(minutes=65) + SESSIONS = [_Session(database, last_use_time=last_use_time)] * 5 SESSIONS[0]._exists = False database._sessions.extend(SESSIONS) pool.bind(database) @@ -915,7 +934,9 @@ def _make_transaction(*args, **kw): class _Session(object): _transaction = None - def __init__(self, database, exists=True, transaction=None): + def __init__( + self, database, exists=True, transaction=None, last_use_time=datetime.utcnow() + ): self._database = database self._exists = exists self._exists_checked = False @@ -923,10 +944,15 @@ def __init__(self, database, exists=True, transaction=None): self.create = mock.Mock() self._deleted = False self._transaction = transaction + self._last_use_time = last_use_time def __lt__(self, other): return id(self) < id(other) + @property + def last_use_time(self): + return self._last_use_time + def exists(self): self._exists_checked = True return self._exists