diff --git a/nutkit/protocol/feature.py b/nutkit/protocol/feature.py index 4f562b12b..aacabfd3b 100644 --- a/nutkit/protocol/feature.py +++ b/nutkit/protocol/feature.py @@ -11,14 +11,19 @@ class Feature(Enum): # a started job. All other connections should be re-established before # running the next job with them. OPT_AUTHORIZATION_EXPIRED_TREATMENT = 'AuthorizationExpiredTreatment' + # The driver caches connections (e.g., in a pool) and doesn't start a new + # one (with hand-shake, HELLO, etc.) for each query. + OPT_CONNECTION_REUSE = "Optimization:ConnectionReuse" + # The driver first tries to SUCCESSfully BEGIN a transaction before calling + # the user-defined transaction function. This way, the (potentially costly) + # transaction function is not started until a working transaction has been + # established. + OPT_EAGER_TX_BEGIN = "Optimization:EagerTransactionBegin" # Driver doesn't explicitly send message data that is the default value. # This conserves bandwidth. OPT_IMPLICIT_DEFAULT_ARGUMENTS = "Optimization:ImplicitDefaultArguments" # The driver sends no more than the strictly necessary RESET messages. OPT_MINIMAL_RESETS = "Optimization:MinimalResets" - # The driver caches connections (e.g., in a pool) and doesn't start a new - # one (with hand-shake, HELLO, etc.) for each query. - OPT_CONNECTION_REUSE = "Optimization:ConnectionReuse" # The driver doesn't wait for a SUCCESS after calling RUN but pipelines a # PULL right afterwards and consumes two messages after that. This saves a # full round-trip. @@ -33,21 +38,9 @@ class Feature(Enum): # as well. CONF_HINT_CON_RECV_TIMEOUT = "ConfHint:connection.recv_timeout_seconds" - # Temporary driver feature that will be removed when all official drivers - # have been unified in their behaviour of when they return a Result object. - # We aim for drivers to not providing a Result until the server replied with - # SUCCESS so that the result keys are already known and attached to the - # Result object without further waiting or communication with the server. - TMP_RESULT_KEYS = "Temporary:ResultKeys" - # Temporary driver feature that will be removed when all official driver - # backends have implemented all summary response fields. - TMP_FULL_SUMMARY = "Temporary:FullSummary" # Temporary driver feature that will be removed when all official driver # backends have implemented path and relationship types TMP_CYPHER_PATH_AND_RELATIONSHIP = "Temporary:CypherPathAndRelationship" - # Temporary driver feature that will be removed when all official driver - # backends have implemented the TransactionClose request - TMP_TRANSACTION_CLOSE = "Temporary:TransactionClose" # TODO Update this once the decision has been made. # Temporary driver feature. There is a pending decision on whether it should # be supported in all drivers or be removed from all of them. @@ -56,5 +49,17 @@ class Feature(Enum): # backends have implemented it. TMP_DRIVER_MAX_TX_RETRY_TIME = "Temporary:DriverMaxTxRetryTime" # Temporary driver feature that will be removed when all official driver + # backends have implemented all summary response fields. + TMP_FULL_SUMMARY = "Temporary:FullSummary" + # Temporary driver feature that will be removed when all official drivers + # have been unified in their behaviour of when they return a Result object. + # We aim for drivers to not providing a Result until the server replied with + # SUCCESS so that the result keys are already known and attached to the + # Result object without further waiting or communication with the server. + TMP_RESULT_KEYS = "Temporary:ResultKeys" + # Temporary driver feature that will be removed when all official driver # backends have implemented it. TMP_RESULT_LIST = "Temporary:ResultList" + # Temporary driver feature that will be removed when all official driver + # backends have implemented the TransactionClose request + TMP_TRANSACTION_CLOSE = "Temporary:TransactionClose" diff --git a/tests/stub/authorization/test_authorization.py b/tests/stub/authorization/test_authorization.py index 44c2f2c71..fceb8f8c6 100644 --- a/tests/stub/authorization/test_authorization.py +++ b/tests/stub/authorization/test_authorization.py @@ -158,11 +158,7 @@ def test_should_fail_with_auth_expired_on_begin_using_tx_run(self): try: tx = session.beginTransaction() # TODO: remove block when all drivers behave the same way - if get_driver_name() in ["python"]: - # driver waits with sending BEGIN until .run is called - # this pipelining saves time but exposes issues later - tx.run("cypher") - if get_driver_name() in ["javascript", "dotnet"]: + if get_driver_name() in ["javascript"]: tx.run("cypher").next() except types.DriverError as e: self.assert_is_authorization_error(error=e) diff --git a/tests/stub/disconnects/test_disconnects.py b/tests/stub/disconnects/test_disconnects.py index 93c43645a..3596be142 100644 --- a/tests/stub/disconnects/test_disconnects.py +++ b/tests/stub/disconnects/test_disconnects.py @@ -96,9 +96,9 @@ def test_disconnect_on_hello(self): self._driver.close() self._server.done() - expected_step = "after first next" - if self._driverName in ["go", "java", "dotnet", "python"]: - expected_step = "after run" + expected_step = "after run" + if self._driverName in ["javascript"]: + expected_step = "after first next" self.assertEqual(step, expected_step) def test_disconnect_after_hello(self): @@ -115,9 +115,9 @@ def test_disconnect_after_hello(self): self._driver.close() self._server.done() - expected_step = "after first next" - if self._driverName in ["go", "python", "java"]: - expected_step = "after run" + expected_step = "after run" + if self._driverName in ["dotnet", "javascript"]: + expected_step = "after first next" self.assertEqual(step, expected_step) def test_disconnect_session_on_run(self): @@ -130,10 +130,9 @@ def test_disconnect_session_on_run(self): self._driver.close() self._server.done() - expected_step = "after first next" - if self._driverName in ["go", "python", "java"]: - # Go reports this error earlier - expected_step = "after run" + expected_step = "after run" + if self._driverName in ["dotnet", "javascript"]: + expected_step = "after first next" self.assertEqual(step, expected_step) def test_disconnect_on_pull(self): @@ -175,7 +174,7 @@ def test_disconnect_on_tx_begin(self): self._server.done() expected_step = "after begin" - if self._driverName in ["python", "go"]: + if self._driverName in ["go"]: expected_step = "after run" elif self._driverName in ["javascript"]: expected_step = "after first next" diff --git a/tests/stub/routing/test_routing_v4x3.py b/tests/stub/routing/test_routing_v4x3.py index 4b0ff51d8..b2af56750 100644 --- a/tests/stub/routing/test_routing_v4x3.py +++ b/tests/stub/routing/test_routing_v4x3.py @@ -521,9 +521,6 @@ def work(tx): @driver_feature(types.Feature.OPT_PULL_PIPELINING) def test_should_retry_write_until_success_with_leader_change_using_tx_function( self): - # TODO remove this block once all languages work - if get_driver_name() in ['python']: - self.skipTest("requires investigation") self._should_retry_write_until_success_with_leader_change_using_tx_function( "writer_tx_with_unexpected_interruption_on_pipelined_pull.script" ) @@ -531,7 +528,7 @@ def test_should_retry_write_until_success_with_leader_change_using_tx_function( def test_should_retry_write_until_success_with_leader_change_on_run_using_tx_function( self): # TODO remove this block once all languages work - if get_driver_name() in ['javascript', 'python']: + if get_driver_name() in ['javascript']: self.skipTest("requires investigation") self._should_retry_write_until_success_with_leader_change_using_tx_function( "writer_tx_with_unexpected_interruption_on_run.script" diff --git a/tests/stub/tx_run/scripts/router_switch_server.script b/tests/stub/tx_run/scripts/router_switch_server.script new file mode 100644 index 000000000..29bbf88f4 --- /dev/null +++ b/tests/stub/tx_run/scripts/router_switch_server.script @@ -0,0 +1,11 @@ +!: BOLT 4.4 +!: AUTO RESET + +A: HELLO {"{}": "*"} +C: ROUTE "*" "*" "*" +S: SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}} +{* + C: ROUTE "*" "*" "*" + S: SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9011"], "role":"READ"}, {"addresses": ["#HOST#:9011"], "role":"WRITE"}]}} +*} +?: GOODBYE diff --git a/tests/stub/tx_run/scripts/tx_commit.script b/tests/stub/tx_run/scripts/tx_commit.script new file mode 100644 index 000000000..dc315f62d --- /dev/null +++ b/tests/stub/tx_run/scripts/tx_commit.script @@ -0,0 +1,20 @@ +!: BOLT 4.4 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN {"U": "*"} {"{}": "*"} {"{}": "*"} +S: SUCCESS {"fields": ["n"], "qid": 1} +{{ + C: PULL {"n": {"Z": "*"}, "[qid]": -1} +---- + C: PULL {"n": {"Z": "*"}, "qid": 1} +}} +S: RECORD [1] + RECORD [2] + SUCCESS {"type": "r"} +C: COMMIT +S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx424242"} +*: RESET +?: GOODBYE diff --git a/tests/stub/tx_run/scripts/tx_disconnect_on_begin.script b/tests/stub/tx_run/scripts/tx_disconnect_on_begin.script new file mode 100644 index 000000000..f84e75beb --- /dev/null +++ b/tests/stub/tx_run/scripts/tx_disconnect_on_begin.script @@ -0,0 +1,6 @@ +!: BOLT 4.4 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: diff --git a/tests/stub/tx_run/scripts/tx_error_on_begin.script b/tests/stub/tx_run/scripts/tx_error_on_begin.script new file mode 100644 index 000000000..f3d50e4d4 --- /dev/null +++ b/tests/stub/tx_run/scripts/tx_error_on_begin.script @@ -0,0 +1,8 @@ +!: BOLT 4.4 + +A: HELLO {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: FAILURE {"code": "Neo.ClientError.MadeUp.Code", "message": "Something went wrong..."} ++: RESET +?: GOODBYE diff --git a/tests/stub/tx_run/scripts/tx_pull_then_rollback.script b/tests/stub/tx_run/scripts/tx_pull_then_rollback.script index 4f12b96a0..d91ad1ee6 100644 --- a/tests/stub/tx_run/scripts/tx_pull_then_rollback.script +++ b/tests/stub/tx_run/scripts/tx_pull_then_rollback.script @@ -4,7 +4,7 @@ A: HELLO {"{}": "*"} *: RESET C: BEGIN {"{}": "*"} S: SUCCESS {} -C: RUN {"U": "*"}{"{}": "*"} {"{}": "*"} +C: RUN {"U": "*"} {"{}": "*"} {"{}": "*"} S: SUCCESS {"fields": ["n"]} C: PULL {"n": {"Z": "*"}, "[qid]": -1} S: RECORD [1] diff --git a/tests/stub/tx_run/test_tx_run.py b/tests/stub/tx_run/test_tx_run.py index d0ae89c93..1f1a419d1 100644 --- a/tests/stub/tx_run/test_tx_run.py +++ b/tests/stub/tx_run/test_tx_run.py @@ -1,6 +1,7 @@ from nutkit.frontend import Driver from nutkit import protocol as types from tests.shared import ( + driver_feature, get_driver_name, TestkitTestCase, ) @@ -10,18 +11,29 @@ class TestTxRun(TestkitTestCase): def setUp(self): super().setUp() - self._server = StubServer(9001) - uri = "bolt://%s" % self._server.address - self._driver = Driver(self._backend, uri, - types.AuthorizationToken(scheme="basic")) + self._router = StubServer(9000) + self._server1 = StubServer(9010) + self._server2 = StubServer(9011) self._session = None def tearDown(self): if self._session is not None: self._session.close() - self._server.done() + self._router.reset() + self._server1.reset() + self._server2.reset() super().tearDown() + def _create_direct_driver(self): + uri = "bolt://%s" % self._server1.address + self._driver = Driver(self._backend, uri, + types.AuthorizationToken(scheme="basic")) + + def _create_routing_driver(self): + uri = "neo4j://%s" % self._router.address + self._driver = Driver(self._backend, uri, + types.AuthorizationToken(scheme="basic")) + def test_rollback_tx_on_session_close_untouched_result(self): # TODO: remove this block once all languages work if get_driver_name() in ["go"]: @@ -29,7 +41,8 @@ def test_rollback_tx_on_session_close_untouched_result(self): "pending transaction") if get_driver_name() in ["javascript"]: self.skipTest("Driver requires result.next() to send PULL") - self._server.start( + self._create_direct_driver() + self._server1.start( path=self.script_path("tx_discard_then_rollback.script") ) self._session = self._driver.session("r", fetchSize=2) @@ -38,7 +51,7 @@ def test_rollback_tx_on_session_close_untouched_result(self): # closing session while tx is open and result is not consumed at all self._session.close() self._session = None - self._server.done() + self._server1.done() def test_rollback_tx_on_session_close_unfinished_result(self): # TODO: remove this block once all languages work @@ -47,9 +60,10 @@ def test_rollback_tx_on_session_close_unfinished_result(self): "pending transaction") if get_driver_name() in ["javascript"]: self.skipTest("Sends RESET instead of ROLLBACK.") - self._server.start( + self._server1.start( path=self.script_path("tx_discard_then_rollback.script") ) + self._create_direct_driver() self._session = self._driver.session("r", fetchSize=2) tx = self._session.beginTransaction() result = tx.run("RETURN 1 AS n") @@ -57,7 +71,7 @@ def test_rollback_tx_on_session_close_unfinished_result(self): # closing session while tx is open and result is not fully consumed self._session.close() self._session = None - self._server.done() + self._server1.done() def test_rollback_tx_on_session_close_consumed_result(self): # TODO: remove this block once all languages work @@ -66,9 +80,10 @@ def test_rollback_tx_on_session_close_consumed_result(self): "pending transaction") if get_driver_name() in ["javascript"]: self.skipTest("Driver sends RESET instead of ROLLBACK") - self._server.start( + self._server1.start( path=self.script_path("tx_discard_then_rollback.script") ) + self._create_direct_driver() self._session = self._driver.session("r", fetchSize=2) tx = self._session.beginTransaction() result = tx.run("RETURN 1 AS n") @@ -76,7 +91,7 @@ def test_rollback_tx_on_session_close_consumed_result(self): # closing session while tx is open and result has been manually consumed self._session.close() self._session = None - self._server.done() + self._server1.done() def test_rollback_tx_on_session_close_finished_result(self): # TODO: remove this block once all languages work @@ -85,7 +100,8 @@ def test_rollback_tx_on_session_close_finished_result(self): "pending transaction") if get_driver_name() in ["javascript"]: self.skipTest("Driver sends RESET instead of ROLLBACK") - self._server.start( + self._create_direct_driver() + self._server1.start( path=self.script_path("tx_pull_then_rollback.script") ) self._session = self._driver.session("r", fetchSize=2) @@ -95,4 +111,86 @@ def test_rollback_tx_on_session_close_finished_result(self): # closing session while tx is open self._session.close() self._session = None - self._server.done() + self._server1.done() + + def _eager_tx_func_run(self, script, routing=False): + if routing: + self._create_routing_driver() + self._router.start( + path=self.script_path("router_switch_server.script"), + vars={"#HOST#": self._server1.host} + ) + self._server2.start(path=self.script_path("tx_commit.script")) + else: + self._create_direct_driver() + self._server1.start(path=self.script_path(script)) + + tx_func_count = 0 + + def work(tx): + nonlocal tx_func_count + tx_func_count += 1 + list(tx.run("RETURN 1 AS n")) + + self._session = self._driver.session("w") + exc = None + try: + self._session.writeTransaction(work) + except types.DriverError as e: + exc = e + + self._session.close() + self._session = None + + return exc, tx_func_count + + @driver_feature(types.Feature.OPT_EAGER_TX_BEGIN) + def test_eager_begin_on_tx_func_run_with_disconnect_on_begin(self): + exc, tx_func_count = self._eager_tx_func_run( + "tx_disconnect_on_begin.script", routing=True + ) + # Driver should retry tx on disconnect after BEGIN and call the tx func + # exactly once (after the disconnect). The disconnect should make the + # driver fetch a new routing table which will point to server2 the + # second time. This server will let the tx succeed. + self.assertIsNone(exc) + self.assertEqual(tx_func_count, 1) + self._router.done() + self.assertEqual(self._router.count_requests("ROUTE"), 2) + self._server1.done() + self._server2.done() + + @driver_feature(types.Feature.OPT_EAGER_TX_BEGIN) + def test_eager_begin_on_tx_func_run_with_error_on_begin(self): + exc, tx_func_count = self._eager_tx_func_run("tx_error_on_begin.script", + routing=False) + # Driver should raise error on non-transient error after BEGIN, and + # never call the tx func. + self.assertEqual("Neo.ClientError.MadeUp.Code", exc.code) + self.assertEqual(tx_func_count, 0) + self._server1.done() + + def _eager_tx_run(self, script): + self._create_direct_driver() + self._server1.start(path=self.script_path(script)) + + self._session = self._driver.session("w") + with self.assertRaises(types.DriverError) as exc: + self._session.beginTransaction() + + self._session.close() + self._session = None + + return exc.exception + + @driver_feature(types.Feature.OPT_EAGER_TX_BEGIN) + def test_eager_begin_on_tx_run_with_disconnect_on_begin(self): + exc = self._eager_tx_run("tx_disconnect_on_begin.script") + if get_driver_name() in ["python"]: + self.assertEqual("", + exc.errorType) + + @driver_feature(types.Feature.OPT_EAGER_TX_BEGIN) + def test_eager_begin_on_tx_run_with_error_on_begin(self): + exc = self._eager_tx_run("tx_error_on_begin.script") + self.assertEqual("Neo.ClientError.MadeUp.Code", exc.code)