Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ See also https://github.com/neo4j/neo4j-python-driver/wiki for a full changelog.
- `ResultSummary.gql_status_objects`
- `neo4j.GqlStatusObject`
- (`neo4j.exceptions.GqlError`, `neo4j.exceptions.GqlErrorClassification`)
- On failed liveness check (s. `liveness_check_timeout` configuration option), the driver will no longer remove the
remote from the cached routing tables, but only close the connection under test.
This aligns the driver with the other official Neo4j drivers.


## Version 5.28
Expand Down
8 changes: 7 additions & 1 deletion src/neo4j/_addressing.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ def port_number(self) -> int:
pass
raise error_cls(f"Unknown port value {self[1]!r}")

def __reduce__(self):
return Address, (tuple(self),)


class IPv4Address(Address):
"""
Expand Down Expand Up @@ -352,12 +355,15 @@ def _host_name(self) -> str:
def _unresolved(self) -> Address:
return super().__new__(Address, (self._host_name, *self[1:]))

def __new__(cls, iterable, *, host_name: str) -> ResolvedAddress:
def __new__(cls, iterable, host_name: str) -> ResolvedAddress:
new = super().__new__(cls, iterable)
new = t.cast(ResolvedAddress, new)
new._unresolved_host_name = host_name
return new

def __reduce__(self):
return ResolvedAddress, (tuple(self), self._unresolved_host_name)


class ResolvedIPv4Address(IPv4Address, ResolvedAddress):
pass
Expand Down
16 changes: 15 additions & 1 deletion src/neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class AsyncBolt:
_closed = False
_defunct = False

# Flag if the connection is currently performing a liveness check.
_liveness_check = False

#: The pool of which this connection is a member
pool = None

Expand Down Expand Up @@ -758,6 +761,13 @@ async def reset(self, dehydration_hooks=None, hydration_hooks=None):
type understood by packstream and are free to return anything.
"""

async def liveness_check(self):
self._liveness_check = True
try:
await self.reset()
finally:
self._liveness_check = False

@abc.abstractmethod
def goodbye(self, dehydration_hooks=None, hydration_hooks=None):
"""
Expand Down Expand Up @@ -934,7 +944,11 @@ async def _set_defunct(self, message, error=None, silent=False):
# remove the connection from the pool, nor to try to close the
# connection again.
await self.close()
if self.pool and not self._get_server_state_manager().failed():
if (
not self._liveness_check
and self.pool
and not self._get_server_state_manager().failed()
):
await self.pool.deactivate(address=self.unresolved_address)

# Iterate through the outstanding responses, and if any correspond
Expand Down
2 changes: 1 addition & 1 deletion src/neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ async def health_check(connection_, deadline_):
"[#%04X] _: <POOL> liveness check",
connection_.local_port,
)
await connection_.reset()
await connection_.liveness_check()
except (OSError, ServiceUnavailable, SessionExpired):
return False
return True
Expand Down
11 changes: 11 additions & 0 deletions src/neo4j/_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,14 @@ def update(self, new_routing_table):

def servers(self):
return set(self.routers) | set(self.writers) | set(self.readers)

def __eq__(self, other):
if not isinstance(other, RoutingTable):
return NotImplemented
return (
self.database == other.database
and self.routers == other.routers
and self.readers == other.readers
and self.writers == other.writers
and self.ttl == other.ttl
)
16 changes: 15 additions & 1 deletion src/neo4j/_sync/io/_bolt.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/neo4j/_sync/io/_pool.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions testkitbackend/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_should_echo_all_timezone_ids'":
"test_subtest_skips.dt_conversion",
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'":
"test_subtest_skips.tz_id",
"stub\\.routing\\.test_routing_v[0-9x]+\\.RoutingV[0-9x]+\\.test_should_drop_connections_failing_liveness_check":
"Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83"
"test_subtest_skips.tz_id"
},
"features": {
"Feature:API:BookmarkManager": true,
Expand Down
1 change: 1 addition & 0 deletions tests/unit/async_/fixtures/fake_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, *args, **kwargs):
self.attach_mock(
mock.AsyncMock(spec=AsyncAuthManager), "auth_manager"
)
self.attach_mock(mock.AsyncMock(), "liveness_check")
self.unresolved_address = next(iter(args), "localhost")

self.callbacks = []
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/async_/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ async def send_message(self, tag, *fields):
self._outbox.append_message(tag, fields, None)
await self._outbox.flush()

def assert_no_more_messages(self):
assert self._messages
assert not self.recv_buffer


class AsyncFakeSocketPair:
def __init__(self, address, packer_cls=None, unpacker_cls=None):
Expand Down
104 changes: 104 additions & 0 deletions tests/unit/async_/io/test_class_bolt_any.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright (c) "Neo4j"
# Neo4j Sweden AB [https://neo4j.com]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import pytest

import neo4j
from neo4j._async.io._bolt3 import AsyncBolt3
from neo4j._async.io._bolt4 import (
AsyncBolt4x0,
AsyncBolt4x1,
AsyncBolt4x2,
AsyncBolt4x3,
)
from neo4j._async.io._bolt5 import (
AsyncBolt5x0,
AsyncBolt5x1,
AsyncBolt5x2,
AsyncBolt5x3,
AsyncBolt5x4,
AsyncBolt5x5,
AsyncBolt5x6,
AsyncBolt5x7,
AsyncBolt5x8,
)
from neo4j.exceptions import ServiceUnavailable

from ...._async_compat import mark_async_test


@pytest.fixture(
params=[
AsyncBolt3,
AsyncBolt4x0,
AsyncBolt4x1,
AsyncBolt4x2,
AsyncBolt4x3,
AsyncBolt5x0,
AsyncBolt5x1,
AsyncBolt5x2,
AsyncBolt5x3,
AsyncBolt5x4,
AsyncBolt5x5,
AsyncBolt5x6,
AsyncBolt5x7,
AsyncBolt5x8,
]
)
def bolt_cls(request):
return request.param


@mark_async_test
async def test_liveness_check_calls_reset(bolt_cls, fake_socket_pair):
address = neo4j.Address(("127.0.0.1", 7687))
sockets = fake_socket_pair(
address,
packer_cls=AsyncBolt5x8.PACKER_CLS,
unpacker_cls=AsyncBolt5x8.UNPACKER_CLS,
)
connection = bolt_cls(address, sockets.client, 0)

await sockets.server.send_message(b"\x70", {})
await connection.liveness_check()
tag, fields = await sockets.server.pop_message()
assert tag == b"\x0f"
assert len(fields) == 0
sockets.server.assert_no_more_messages()


@mark_async_test
async def test_failed_liveness_check_does_not_call_pool(
bolt_cls, fake_socket_pair, mocker
):
def broken_recv_into(*args, **kwargs):
raise OSError("nope")

address = neo4j.Address(("127.0.0.1", 7687))
sockets = fake_socket_pair(
address,
packer_cls=bolt_cls.PACKER_CLS,
unpacker_cls=bolt_cls.UNPACKER_CLS,
)
connection = bolt_cls(address, sockets.client, 0)
pool_mock = mocker.AsyncMock()
connection.pool = pool_mock
sockets.client.recv_into = broken_recv_into

with pytest.raises(ServiceUnavailable):
await connection.liveness_check()

assert not pool_mock.method_calls
10 changes: 5 additions & 5 deletions tests/unit/async_/io/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ async def test_liveness_check(
else:
cx1.is_idle_for.assert_not_called()
await pool.release(cx1)
cx1.reset.assert_not_called()
cx1.liveness_check.assert_not_called()

# simulate after timeout
cx1.is_idle_for.return_value = True
Expand All @@ -271,13 +271,13 @@ async def test_liveness_check(
assert cx2 is cx1
if effective_timeout is not None:
cx1.is_idle_for.assert_called_once_with(effective_timeout)
cx1.reset.assert_awaited_once()
cx1.liveness_check.assert_awaited_once()
else:
cx1.is_idle_for.assert_not_called()
cx1.reset.assert_not_called()
cx1.reset.reset_mock()
cx1.liveness_check.assert_not_called()
cx1.liveness_check.reset_mock()
await pool.release(cx1)
cx1.reset.assert_not_called()
cx1.liveness_check.assert_not_called()


@pytest.mark.parametrize("unprepared", (True, False, None))
Expand Down
Loading