Skip to content

Commit 20326d9

Browse files
committed
[ADR-001] Initial draft of bookmark manager
1 parent 799720d commit 20326d9

File tree

10 files changed

+449
-24
lines changed

10 files changed

+449
-24
lines changed

neo4j/_async/bookmark_manager.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [https://neo4j.com]
3+
#
4+
# This file is part of Neo4j.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
from __future__ import annotations
20+
21+
import typing as t
22+
from collections import defaultdict
23+
24+
from .._async_compat.concurrency import (
25+
AsyncCooperativeLock,
26+
AsyncLock,
27+
)
28+
from .._async_compat.util import AsyncUtil
29+
from ..api import AsyncBookmarkManager
30+
31+
32+
class AsyncNeo4jBookmarkManager(AsyncBookmarkManager):
33+
def __init__(self, initial_bookmarks=None, bookmark_supplier=None,
34+
notify_bookmarks=None):
35+
super().__init__()
36+
self._initial_bookmarks = initial_bookmarks
37+
self._bookmark_supplier = bookmark_supplier
38+
self._notify_bookmarks = notify_bookmarks
39+
self._bookmarks = defaultdict(set)
40+
if bookmark_supplier or notify_bookmarks:
41+
self._lock = AsyncLock()
42+
else:
43+
self._lock = AsyncCooperativeLock()
44+
45+
async def update_bookmarks(
46+
self, database: str, previous_bookmarks: t.Iterable[str],
47+
new_bookmarks: t.Iterable[str]
48+
) -> None:
49+
with self._lock:
50+
new_bms = set(new_bookmarks)
51+
if not new_bms:
52+
return
53+
prev_bms = set(previous_bookmarks)
54+
curr_bms = self._bookmarks[database]
55+
curr_bms.difference_update(prev_bms)
56+
curr_bms.update(new_bms)
57+
58+
if self._notify_bookmarks:
59+
await AsyncUtil.callback(
60+
self._notify_bookmarks, database, tuple(curr_bms)
61+
)
62+
63+
async def _get_bookmarks(self, database: str) -> t.Set[str]:
64+
bms = self._bookmarks[database]
65+
if self._bookmark_supplier:
66+
extra_bms = await AsyncUtil.callback(
67+
self._bookmark_supplier, database
68+
)
69+
if extra_bms is not None:
70+
bms &= set(extra_bms)
71+
return bms
72+
73+
async def get_bookmarks(self, database: str) -> t.Set[str]:
74+
with self._lock:
75+
return await self._get_bookmarks(database)
76+
77+
async def get_all_bookmarks(
78+
self, must_included_databases: t.Iterable[str]
79+
) -> t.Set[str]:
80+
with self._lock:
81+
bms = set()
82+
databases = (set(must_included_databases)
83+
| set(self._bookmarks.keys()))
84+
for database in databases:
85+
bms.update(await self._get_bookmarks(database))
86+
return bms

neo4j/_async/driver.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
)
4444
from ..addressing import Address
4545
from ..api import (
46+
AsyncBookmarkManager,
4647
Auth,
48+
BookmarkManager,
4749
Bookmarks,
4850
DRIVER_BOLT,
4951
DRIVER_NEO4J,
@@ -62,9 +64,14 @@
6264
URI_SCHEME_NEO4J_SECURE,
6365
URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
6466
)
67+
from .bookmark_manager import AsyncNeo4jBookmarkManager
6568
from .work import AsyncSession
6669

6770

71+
_T_BmSupplier = t.Callable[[str], t.Union[Bookmarks, t.Awaitable[Bookmarks]]]
72+
_T_NotifyBm = t.Callable[[str, Bookmarks], t.Union[None, t.Awaitable[None]]]
73+
74+
6875
class AsyncGraphDatabase:
6976
"""Accessor for :class:`neo4j.Driver` construction.
7077
"""
@@ -94,6 +101,8 @@ def driver(
94101
ssl_context: ssl.SSLContext = ...,
95102
user_agent: str = ...,
96103
keep_alive: bool = ...,
104+
bookmark_manager: t.Union[AsyncBookmarkManager,
105+
BookmarkManager, None] = ...,
97106

98107
# undocumented/unsupported options
99108
# they may be change or removed any time without prior notice
@@ -208,6 +217,36 @@ def driver(cls, uri, *, auth=None, **config) -> AsyncDriver:
208217
return cls.neo4j_driver(parsed.netloc, auth=auth,
209218
routing_context=routing_context, **config)
210219

220+
@classmethod
221+
def bookmark_manager(
222+
cls, initial_bookmarks: Bookmarks = None,
223+
bookmark_supplier: _T_BmSupplier = None,
224+
notify_bookmarks: _T_NotifyBm = None
225+
) -> AsyncBookmarkManager:
226+
"""Create a default :class:`AsyncBookmarkManager`.
227+
228+
:param initial_bookmarks:
229+
The initial set of bookmarks. The default bookmark manager will
230+
seed the set of bookmarks for each database with this value.
231+
:param bookmark_supplier:
232+
Function which will be called every time the default bookmark
233+
manager's method :meth:`.AsyncBookmarkManager.get_bookmarks`
234+
gets called. The result of ``bookmark_supplier`` will be
235+
concatenated with the internal set of bookmarks and used to
236+
configure the session in creation.
237+
:param notify_bookmarks:
238+
Function which will be called whenever the set of bookmarks
239+
handled by the bookmark manager gets updated with the new
240+
internal bookmark set.
241+
242+
:returns: A default implementation of :class:`AsyncBookmarkManager`.
243+
"""
244+
return AsyncNeo4jBookmarkManager(
245+
initial_bookmarks=initial_bookmarks,
246+
bookmark_supplier=bookmark_supplier,
247+
notify_bookmarks=notify_bookmarks
248+
)
249+
211250
@classmethod
212251
def bolt_driver(cls, target, *, auth=None, **config):
213252
""" Create a driver for direct Bolt server access that uses
@@ -353,6 +392,8 @@ def session(
353392
initial_retry_delay: float = ...,
354393
retry_delay_multiplier: float = ...,
355394
retry_delay_jitter_factor: float = ...,
395+
bookmark_manager: t.Union[AsyncBookmarkManager,
396+
BookmarkManager, None] = ...,
356397
) -> AsyncSession:
357398
...
358399

@@ -394,6 +435,8 @@ async def verify_connectivity(
394435
initial_retry_delay: float = ...,
395436
retry_delay_multiplier: float = ...,
396437
retry_delay_jitter_factor: float = ...,
438+
bookmark_manager: t.Union[AsyncBookmarkManager,
439+
BookmarkManager, None] = ...,
397440
) -> None:
398441
...
399442

@@ -456,6 +499,8 @@ async def get_server_info(
456499
initial_retry_delay: float = ...,
457500
retry_delay_multiplier: float = ...,
458501
retry_delay_jitter_factor: float = ...,
502+
bookmark_manager: t.Union[AsyncBookmarkManager,
503+
BookmarkManager, None] = ...,
459504
) -> ServerInfo:
460505
...
461506

neo4j/_async/work/session.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,12 @@ class AsyncSession(AsyncWorkspace):
100100
_state_failed = False
101101

102102
def __init__(self, pool, session_config):
103-
super().__init__(pool, session_config)
104103
assert isinstance(session_config, SessionConfig)
105-
self._bookmarks = self._prepare_bookmarks(session_config.bookmarks)
104+
super().__init__(pool, session_config)
105+
if session_config.bookmarks is not None:
106+
self._bookmarks = self._prepare_bookmarks(session_config.bookmarks)
107+
else:
108+
self._bookmark_manager = session_config.bookmark_manager
106109

107110
async def __aenter__(self) -> AsyncSession:
108111
return self
@@ -147,10 +150,6 @@ async def _disconnect(self, sync=False):
147150
self._handle_cancellation(message="_disconnect")
148151
raise
149152

150-
def _collect_bookmark(self, bookmark):
151-
if bookmark:
152-
self._bookmarks = bookmark,
153-
154153
def _handle_cancellation(self, message="General"):
155154
self._transaction = None
156155
self._auto_result = None
@@ -165,7 +164,7 @@ def _handle_cancellation(self, message="General"):
165164

166165
async def _result_closed(self):
167166
if self._auto_result:
168-
self._collect_bookmark(self._auto_result._bookmark)
167+
await self._update_bookmark(self._auto_result._bookmark)
169168
self._auto_result = None
170169
await self._disconnect()
171170

@@ -196,7 +195,7 @@ async def close(self) -> None:
196195
if self._state_failed is False:
197196
try:
198197
await self._auto_result.consume()
199-
self._collect_bookmark(self._auto_result._bookmark)
198+
await self._update_bookmark(self._auto_result._bookmark)
200199
except Exception as error:
201200
# TODO: Investigate potential non graceful close states
202201
self._auto_result = None
@@ -336,7 +335,7 @@ async def last_bookmark(self) -> t.Optional[str]:
336335
await self._auto_result.consume()
337336

338337
if self._transaction and self._transaction._closed:
339-
self._collect_bookmark(self._transaction._bookmark)
338+
await self._update_bookmark(self._transaction._bookmark)
340339
self._transaction = None
341340

342341
if self._bookmarks:
@@ -377,14 +376,14 @@ async def last_bookmarks(self) -> Bookmarks:
377376
await self._auto_result.consume()
378377

379378
if self._transaction and self._transaction._closed():
380-
self._collect_bookmark(self._transaction._bookmark)
379+
await self._update_bookmark(self._transaction._bookmark)
381380
self._transaction = None
382381

383382
return Bookmarks.from_raw_values(self._bookmarks)
384383

385384
async def _transaction_closed_handler(self):
386385
if self._transaction:
387-
self._collect_bookmark(self._transaction._bookmark)
386+
await self._update_bookmark(self._transaction._bookmark)
388387
self._transaction = None
389388
await self._disconnect()
390389

neo4j/_async/work/workspace.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import asyncio
2222

23+
from ..._async_compat.util import AsyncUtil
2324
from ..._conf import WorkspaceConfig
2425
from ..._deadline import Deadline
2526
from ..._meta import (
@@ -44,7 +45,9 @@ def __init__(self, pool, config):
4445
self._connection_access_mode = None
4546
# Sessions are supposed to cache the database on which to operate.
4647
self._cached_database = False
47-
self._bookmarks = None
48+
self._bookmarks = ()
49+
self._bookmark_manager = None
50+
self._last_from_bookmark_manager = None
4851
# Workspace has been closed.
4952
self._closed = False
5053

@@ -77,6 +80,40 @@ def _set_cached_database(self, database):
7780
self._cached_database = True
7881
self._config.database = database
7982

83+
async def _get_bookmarks(self, database):
84+
if self._bookmark_manager is not None:
85+
self._bookmarks = tuple(
86+
await AsyncUtil.callback(
87+
self._bookmark_manager.get_bookmarks, database
88+
)
89+
)
90+
return self._bookmarks
91+
92+
async def _get_all_bookmarks(self, must_included_databases):
93+
if self._bookmark_manager is not None:
94+
self._bookmarks = tuple(
95+
await AsyncUtil.callback(
96+
self._bookmark_manager.get_all_bookmarks,
97+
must_included_databases
98+
)
99+
)
100+
return self._bookmarks
101+
102+
async def _update_bookmarks(self, database, new_bookmarks):
103+
if not new_bookmarks:
104+
return
105+
previous_bookmarks = self._bookmarks
106+
self._bookmarks = new_bookmarks
107+
if self._bookmark_manager is None:
108+
return
109+
await self._bookmark_manager.update_bookmarks(
110+
database, previous_bookmarks, new_bookmarks
111+
)
112+
113+
async def _update_bookmark(self, bookmark):
114+
if bookmark:
115+
await self._update_bookmarks(self._config.database, (bookmark,))
116+
80117
async def _connect(self, access_mode, **acquire_kwargs):
81118
timeout = Deadline(self._config.session_connection_timeout)
82119
if self._connection:

neo4j/_conf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,10 @@ class WorkspaceConfig(Config):
429429
impersonated_user = None
430430
# Note that you need appropriate permissions to do so.
431431

432+
#: Bookmark Manager
433+
bookmark_manager = None
434+
# Specify the bookmark manager to be used for sessions by default.
435+
432436

433437
class SessionConfig(WorkspaceConfig):
434438
""" Session configuration.

0 commit comments

Comments
 (0)