Skip to content
Merged
5 changes: 4 additions & 1 deletion nutkit/frontend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from .config import Neo4jBookmarkManagerConfig
from .bookmark_manager import (
BookmarkManager,
Neo4jBookmarkManagerConfig,
)
from .driver import Driver
from .exceptions import ApplicationCodeError
83 changes: 83 additions & 0 deletions nutkit/frontend/bookmark_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Optional,
)

from nutkit.backend import Backend

from .. import protocol


@dataclass
class Neo4jBookmarkManagerConfig:
initial_bookmarks: Optional[Dict[str, List[str]]] = None
bookmarks_supplier: Optional[Callable[[str], List[str]]] = None
bookmarks_consumer: Optional[Callable[[str, List[str]], None]] = None


@dataclass
class BookmarkManager:
_registry: ClassVar[Dict[Any, BookmarkManager]] = {}

def __init__(self, backend: Backend, config: Neo4jBookmarkManagerConfig):
self._backend = backend
self.config = config

req = protocol.NewBookmarkManager(
config.initial_bookmarks,
config.bookmarks_supplier is not None,
config.bookmarks_consumer is not None
)
res = backend.send_and_receive(req)
if not isinstance(res, protocol.BookmarkManager):
raise Exception("Should be BookmarkManager but was %s" % res)

self._bookmark_manager = res
self._registry[self._bookmark_manager.id] = self

@property
def id(self):
return self._bookmark_manager.id

@classmethod
def process_callbacks(cls, request):
if isinstance(request, protocol.BookmarksSupplierRequest):
if request.bookmark_manager_id not in cls._registry:
raise Exception(
"Backend provided unknown Bookmark manager id: "
f"{request.bookmark_manager_id} not found"
)

manager = cls._registry[request.bookmark_manager_id]
supply = manager.config.bookmarks_supplier
if supply is not None:
bookmarks = supply(request.database)
return protocol.BookmarksSupplierCompleted(request.id,
bookmarks)
if isinstance(request, protocol.BookmarksConsumerRequest):
if request.bookmark_manager_id not in cls._registry:
raise ValueError(
"Backend provided unknown Bookmark manager id: "
f"{request.bookmark_manager_id} not found"
)
manager = cls._registry[request.bookmark_manager_id]
consume = manager.config.bookmarks_consumer
if consume is not None:
consume(request.database, request.bookmarks)
return protocol.BookmarksConsumerCompleted(request.id)

def close(self, hooks=None):
res = self._backend.send_and_receive(
protocol.BookmarkManagerClose(self.id),
hooks=hooks
)
if not isinstance(res, protocol.BookmarkManager):
raise Exception("Should be BookmarkManager but was %s" % res)
del self._registry[self.id]
29 changes: 0 additions & 29 deletions nutkit/frontend/config.py

This file was deleted.

47 changes: 11 additions & 36 deletions nutkit/frontend/driver.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from typing import Optional

from .. import protocol
from .config import from_bookmark_manager_config_to_protocol
from .config import Neo4jBookmarkManagerConfig as BMMConfig
from .bookmark_manager import BookmarkManager
from .session import Session


Expand All @@ -13,15 +10,11 @@ def __init__(self, backend, uri, auth_token, user_agent=None,
max_tx_retry_time_ms=None, encrypted=None,
trusted_certificates=None, liveness_check_timeout_ms=None,
max_connection_pool_size=None,
connection_acquisition_timeout_ms=None,
bookmark_manager_config: Optional[BMMConfig] = None):
connection_acquisition_timeout_ms=None):
self._backend = backend
self._resolver_fn = resolver_fn
self._domain_name_resolver_fn = domain_name_resolver_fn
self._bookmark_manager_config = bookmark_manager_config
bookmark_manager = from_bookmark_manager_config_to_protocol(
bookmark_manager_config
)

req = protocol.NewDriver(
uri, auth_token, userAgent=user_agent,
resolverRegistered=resolver_fn is not None,
Expand All @@ -32,7 +25,6 @@ def __init__(self, backend, uri, auth_token, user_agent=None,
liveness_check_timeout_ms=liveness_check_timeout_ms,
max_connection_pool_size=max_connection_pool_size,
connection_acquisition_timeout_ms=connection_acquisition_timeout_ms, # noqa: E501
bookmark_manager=bookmark_manager
)
res = backend.send_and_receive(req)
if not isinstance(res, protocol.Driver):
Expand All @@ -59,29 +51,11 @@ def receive(self, timeout=None, hooks=None, *, allow_resolution):
hooks=hooks
)
continue
if self._bookmark_manager_config is not None:
supply = self._bookmark_manager_config.bookmarks_supplier
if supply is not None:
if isinstance(res, protocol.BookmarksSupplierRequest):
bookmarks = supply(res.database)
self._backend.send(
protocol.BookmarksSupplierCompleted(
res.id,
bookmarks
),
hooks=hooks
)
continue
consume = self._bookmark_manager_config.bookmarks_consumer
if consume is not None:
if isinstance(res, protocol.BookmarksConsumerRequest):
consume(res.database, res.bookmarks)
self._backend.send(
protocol.BookmarksConsumerCompleted(
res.id
)
)
continue
bookmark_manager_response = BookmarkManager.process_callbacks(res)
if bookmark_manager_response:
self._backend.send(bookmark_manager_response, hooks=hooks)
continue

return res

def send(self, req, hooks=None):
Expand Down Expand Up @@ -128,16 +102,17 @@ def close(self):

def session(self, access_mode, bookmarks=None, database=None,
fetch_size=None, impersonated_user=None,
ignore_bookmark_manager=None):
bookmark_manager=None):
req = protocol.NewSession(
self._driver.id, access_mode, bookmarks=bookmarks,
database=database, fetchSize=fetch_size,
impersonatedUser=impersonated_user,
ignore_bookmark_manager=ignore_bookmark_manager
bookmark_manager=bookmark_manager
)
res = self.send_and_receive(req, allow_resolution=False)
if not isinstance(res, protocol.Session):
raise Exception("Should be session")

return Session(self, res)

def resolve(self, address):
Expand Down
46 changes: 35 additions & 11 deletions nutkit/protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ def __init__(
fetchSize=None, maxTxRetryTimeMs=None, encrypted=None,
trustedCertificates=None, liveness_check_timeout_ms=None,
max_connection_pool_size=None,
connection_acquisition_timeout_ms=None,
bookmark_manager=None
connection_acquisition_timeout_ms=None
):
# Neo4j URI to connect to
self.uri = uri
Expand All @@ -88,8 +87,6 @@ def __init__(
self.livenessCheckTimeoutMs = liveness_check_timeout_ms
self.maxConnectionPoolSize = max_connection_pool_size
self.connectionAcquisitionTimeoutMs = connection_acquisition_timeout_ms
if bookmark_manager is not None:
self.bookmarkManager = bookmark_manager
# (bool) whether to enable or disable encryption
# field missing in message: use driver default (should be False)
if encrypted is not None:
Expand Down Expand Up @@ -202,8 +199,8 @@ class BookmarksSupplierCompleted:
Pushes bookmarks for a given database to the Bookmark Manager.
"""

def __init__(self, requestId, bookmarks):
self.requestId = requestId
def __init__(self, request_id, bookmarks):
self.requestId = request_id
self.bookmarks = bookmarks


Expand All @@ -214,8 +211,35 @@ class BookmarksConsumerCompleted:
Signal the method call has finished
"""

def __init__(self, requestId):
self.requestId = requestId
def __init__(self, request_id):
self.requestId = request_id


class NewBookmarkManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a method to close/destroy BMMs to avoid piling up resources in the backend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will only have 20 lightweight objects in the backend. The log messages generated for this call in the JS backend concern me more. I already removed some log messages from the backend for resolving issues with the process being killed.

"""Instantiates a bookmark manager by calling the default factory.

Backend should respond with a BookmarkManager response.
"""

def __init__(self, initial_bookmarks,
bookmarks_supplier_registered, bookmarks_consumer_registered):
self.initialBookmarks = initial_bookmarks
self.bookmarksSupplierRegistered = bookmarks_supplier_registered
self.bookmarksConsumerRegistered = bookmarks_consumer_registered


class BookmarkManagerClose:
"""Destroy the bookmark manager in the backend and free the resources.

The driver-provided BookmarkManager implementation does not have a close
method. This message is an instruction solely for the backend to be able to
destroy the bookmark manager object when done testing it to free resources.

Backend should respond with a BookmarkManager response.
"""

def __init__(self, id):
self.id = id


class DomainNameResolutionCompleted:
Expand Down Expand Up @@ -258,7 +282,7 @@ class NewSession:

def __init__(self, driverId, accessMode, bookmarks=None,
database=None, fetchSize=None, impersonatedUser=None,
ignore_bookmark_manager=None):
bookmark_manager=None):
# Id of driver on backend that session should be created on
self.driverId = driverId
# Session accessmode: 'r' for read access and 'w' for write access.
Expand All @@ -268,8 +292,8 @@ def __init__(self, driverId, accessMode, bookmarks=None,
self.database = database
self.fetchSize = fetchSize
self.impersonatedUser = impersonatedUser
if ignore_bookmark_manager is not None:
self.ignoreBookmarkManager = ignore_bookmark_manager
if bookmark_manager is not None:
self.bookmarkManagerId = bookmark_manager.id


class SessionClose:
Expand Down
13 changes: 11 additions & 2 deletions nutkit/protocol/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ def __init__(self, id, address):
self.address = address


class BookmarkManager:
"""Represents a created Bookmark Manager."""

def __init__(self, id):
self.id = id


class BookmarksSupplierRequest:
"""
Represents a bookmark supplier in the Bookmark Manager.
Expand All @@ -81,9 +88,10 @@ class BookmarksSupplierRequest:
bookmarks for all databases.
"""

def __init__(self, id, database=None):
def __init__(self, id, bookmarkManagerId, database=None):
# id of the callback request
self.id = id
self.bookmark_manager_id = bookmarkManagerId
self.database = database


Expand All @@ -95,9 +103,10 @@ class BookmarksConsumerRequest:
to send the new bookmark set for a given database.
"""

def __init__(self, id, database, bookmarks):
def __init__(self, id, bookmarkManagerId, database, bookmarks):
# id of the callback request
self.id = id
self.bookmark_manager_id = bookmarkManagerId
self.database = database
self.bookmarks = bookmarks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ A: HELLO {"{}": "*"}
S: SUCCESS {"type": "r"}
C: COMMIT
S: SUCCESS {"bookmark": "bm3"}
----
C: BEGIN {"tx_metadata": {"return_bookmark": "bm4"}, "[bookmarks]": "*", "[db]": "*"}
S: SUCCESS {}
C: RUN "RETURN 1 as n" {} {}
S: SUCCESS {"fields": ["n"]}
C: PULL {"n": 1000}
S: SUCCESS {"type": "r"}
C: COMMIT
S: SUCCESS {"bookmark": "bm4"}
----
C: BEGIN {"tx_metadata": {"order": "adb"}, "[bookmarks]": "*", "[db]": "*"}
S: SUCCESS {}
Expand Down
Loading