diff --git a/driver/src/main/java/org/neo4j/driver/exceptions/TransactionTerminatedException.java b/driver/src/main/java/org/neo4j/driver/exceptions/TransactionTerminatedException.java new file mode 100644 index 0000000000..71ab205efb --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/exceptions/TransactionTerminatedException.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.exceptions; + +import java.io.Serial; + +/** + * Indicates that the transaction has been terminated. + * @since 5.10 + */ +public class TransactionTerminatedException extends ClientException { + @Serial + private static final long serialVersionUID = 7639191706067500206L; + + /** + * Creates a new instance. + * @param message the message + */ + public TransactionTerminatedException(String message) { + super(message); + } + + /** + * Creates a new instance. + * @param code the code + * @param message the message + */ + public TransactionTerminatedException(String code, String message) { + super(code, message); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java index 37e7b3a6ef..5af91e1a71 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java @@ -66,6 +66,22 @@ public boolean isOpen() { return tx.isOpen(); } + /** + * THIS IS A PRIVATE API + *

+ * Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the + * transaction has not already been terminated, is not closed or closing. + * + * @since 5.10 + * @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing + * @see org.neo4j.driver.exceptions.TransactionTerminatedException + */ + public void terminate() { + Futures.blockingGet( + tx.terminateAsync(), + () -> terminateConnectionOnThreadInterrupt("Thread interrupted while terminating the transaction")); + } + private void terminateConnectionOnThreadInterrupt(String reason) { tx.connection().terminateAndRelease(reason); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java index 5e4fb4883d..e96962ef13 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.connection.ChannelAttributes; import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; @@ -146,9 +147,9 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me } @Override - public CompletionStage reset() { + public CompletionStage reset(Neo4jException error) { CompletableFuture result = new CompletableFuture<>(); - ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result); + ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result, error); writeResetMessageIfNeeded(handler, true); return result; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index f57df5218e..b4c3e97ac0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -43,6 +43,7 @@ import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TransactionNestingException; +import org.neo4j.driver.exceptions.TransactionTerminatedException; import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.FailableCursor; @@ -175,7 +176,8 @@ public CompletionStage resetAsync() { return existingTransactionOrNull() .thenAccept(tx -> { if (tx != null) { - tx.markTerminated(null); + tx.markTerminated(new TransactionTerminatedException( + "The transaction has been explicitly terminated by the driver")); } }) .thenCompose(ignore -> connectionStage) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 494f845bfa..ff82aa6602 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.async; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.neo4j.driver.internal.util.Futures.asCompletionException; import static org.neo4j.driver.internal.util.Futures.combineErrors; import static org.neo4j.driver.internal.util.Futures.completedWithNull; @@ -44,6 +45,7 @@ import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; +import org.neo4j.driver.exceptions.TransactionTerminatedException; import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.cursor.AsyncResultCursor; import org.neo4j.driver.internal.cursor.RxResultCursor; @@ -210,10 +212,14 @@ private void ensureCanRunQueries() { } else if (state == State.ROLLED_BACK) { throw new ClientException("Cannot run more queries in this transaction, it has been rolled back"); } else if (state == State.TERMINATED) { - throw new ClientException( - "Cannot run more queries in this transaction, " - + "it has either experienced an fatal error or was explicitly terminated", - causeOfTermination); + if (causeOfTermination instanceof TransactionTerminatedException transactionTerminatedException) { + throw transactionTerminatedException; + } else { + throw new ClientException( + "Cannot run more queries in this transaction, " + + "it has either experienced an fatal error or was explicitly terminated", + causeOfTermination); + } } }); } @@ -319,20 +325,21 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul return stage; } - /** - * Marks transaction as terminated and sends {@code RESET} message over allocated connection. - *

- * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. - * - * @return {@code RESET} response stage - */ - public CompletionStage interruptAsync() { + public CompletionStage terminateAsync() { return executeWithLock(lock, () -> { - if (interruptStage == null) { - markTerminated(null); - interruptStage = connection.reset(); + if (!isOpen() || commitFuture != null || rollbackFuture != null) { + return failedFuture(new ClientException("Can't terminate closed or closing transaction")); + } else { + if (state == State.TERMINATED) { + return interruptStage != null ? interruptStage : completedFuture(null); + } else { + var error = new TransactionTerminatedException( + "The transaction has been explicitly terminated by the driver"); + markTerminated(error); + interruptStage = connection.reset(error); + return interruptStage; + } } - return interruptStage; }); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java index 010618b4a8..0474466c3f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.DirectConnectionProvider; @@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me } @Override - public CompletionStage reset() { - return delegate.reset(); + public CompletionStage reset(Neo4jException error) { + return delegate.reset(error); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java index fec880ace0..35bb0ceb9c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.RoutingErrorHandler; @@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me } @Override - public CompletionStage reset() { - return delegate.reset(); + public CompletionStage reset(Neo4jException error) { + return delegate.reset(error); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 0960460ed7..d270258d5e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.Map; +import java.util.Optional; import java.util.Queue; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -36,6 +37,7 @@ import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TokenExpiredException; import org.neo4j.driver.exceptions.TokenExpiredRetryableException; +import org.neo4j.driver.internal.handlers.PullAllResponseHandler; import org.neo4j.driver.internal.handlers.ResetResponseHandler; import org.neo4j.driver.internal.logging.ChannelActivityLogger; import org.neo4j.driver.internal.logging.ChannelErrorLogger; @@ -43,6 +45,7 @@ import org.neo4j.driver.internal.security.StaticAuthTokenManager; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.ErrorUtil; +import org.neo4j.driver.internal.value.BooleanValue; public class InboundMessageDispatcher implements ResponseMessageHandler { private final Channel channel; @@ -88,7 +91,19 @@ public void handleSuccessMessage(Map meta) { log.debug("S: SUCCESS %s", meta); invokeBeforeLastHandlerHook(HandlerHook.MessageType.SUCCESS); ResponseHandler handler = removeHandler(); - handler.onSuccess(meta); + getResetResponseHandler() + .flatMap(ResetResponseHandler::error) + .ifPresentOrElse( + error -> { + if (handler instanceof PullAllResponseHandler + && meta.getOrDefault("has_more", BooleanValue.FALSE) + .asBoolean()) { + handler.onFailure(error); + } else { + handler.onSuccess(meta); + } + }, + () -> handler.onSuccess(meta)); } @Override @@ -146,22 +161,32 @@ public void handleFailureMessage(String code, String message) { public void handleIgnoredMessage() { log.debug("S: IGNORED"); - ResponseHandler handler = removeHandler(); + var handler = removeHandler(); Throwable error; if (currentError != null) { error = currentError; } else { - log.warn( - "Received IGNORED message for handler %s but error is missing and RESET is not in progress. " - + "Current handlers %s", - handler, handlers); - - error = new ClientException("Database ignored the request"); + var resetHandlerOpt = getResetResponseHandler(); + if (resetHandlerOpt.isEmpty()) { + log.warn( + "Received IGNORED message for handler %s but error is missing and RESET is not in progress. Current handlers %s", + handler, handlers); + } + error = resetHandlerOpt + .flatMap(ResetResponseHandler::error) + .orElseGet(() -> new ClientException("Database ignored the request")); } handler.onFailure(error); } + private Optional getResetResponseHandler() { + return handlers.stream() + .filter(nextHandler -> nextHandler instanceof ResetResponseHandler) + .map(nextHandler -> (ResetResponseHandler) nextHandler) + .findFirst(); + } + public void handleChannelInactive(Throwable cause) { // report issue if the connection has not been terminated as a result of a graceful shutdown request from its // parent pool diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index 82cd04d207..db8fa27777 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -19,22 +19,33 @@ package org.neo4j.driver.internal.handlers; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.Value; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.spi.ResponseHandler; public class ResetResponseHandler implements ResponseHandler { private final InboundMessageDispatcher messageDispatcher; private final CompletableFuture completionFuture; + private final Neo4jException error; public ResetResponseHandler(InboundMessageDispatcher messageDispatcher) { this(messageDispatcher, null); } public ResetResponseHandler(InboundMessageDispatcher messageDispatcher, CompletableFuture completionFuture) { + this(messageDispatcher, completionFuture, null); + } + + public ResetResponseHandler( + InboundMessageDispatcher messageDispatcher, + CompletableFuture completionFuture, + Neo4jException error) { this.messageDispatcher = messageDispatcher; this.completionFuture = completionFuture; + this.error = error; } @Override @@ -52,6 +63,10 @@ public final void onRecord(Value[] fields) { throw new UnsupportedOperationException(); } + public Optional error() { + return Optional.ofNullable(error); + } + private void resetCompleted(boolean success) { messageDispatcher.clearCurrentError(); if (completionFuture != null) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java index c5a089cded..b8d8ccb70c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java @@ -61,14 +61,16 @@ public Publisher run(Query query) { } /** - * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + * THIS IS A PRIVATE API *

- * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. + * Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the + * transaction has not already been terminated, is not closed or closing. * - * @return {@code RESET} response publisher + * @return completion publisher (the {@code RESET} completion publisher if the message was sent) + * @since 5.10 */ - public Publisher interrupt() { - return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync())); + public Publisher terminate() { + return publisherToFlowPublisher(Mono.fromCompletionStage(tx.terminateAsync())); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java index b91419e6cf..01dabf8e8a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java @@ -60,14 +60,16 @@ public Publisher run(Query query) { } /** - * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + * THIS IS A PRIVATE API *

- * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. + * Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the + * transaction has not already been terminated, is not closed or closing. * - * @return {@code RESET} response publisher + * @return completion publisher (the {@code RESET} completion publisher if the message was sent) + * @since 5.10 */ - public Publisher interrupt() { - return Mono.fromCompletionStage(tx.interruptAsync()); + public Publisher terminate() { + return Mono.fromCompletionStage(tx.terminateAsync()); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 54fe232953..7fa37cc317 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletionStage; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.messaging.BoltProtocol; @@ -42,7 +43,11 @@ public interface Connection { void writeAndFlush(Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2); - CompletionStage reset(); + default CompletionStage reset() { + return reset(null); + } + + CompletionStage reset(Neo4jException error); CompletionStage release(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java index 4a2f2a18f5..615cc93ef6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java @@ -32,6 +32,7 @@ import org.neo4j.driver.exceptions.SecurityException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.TokenExpiredException; +import org.neo4j.driver.exceptions.TransactionTerminatedException; import org.neo4j.driver.exceptions.TransientException; public final class ErrorUtil { @@ -72,6 +73,8 @@ public static Neo4jException newNeo4jError(String code, String message) { } else { if (code.equalsIgnoreCase("Neo.ClientError.Database.DatabaseNotFound")) { return new FatalDiscoveryException(code, message); + } else if (code.equalsIgnoreCase("Neo.ClientError.Transaction.Terminated")) { + return new TransactionTerminatedException(code, message); } else { return new ClientException(code, message); } diff --git a/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java index 64b9e8b372..ea9cb1c399 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java @@ -25,6 +25,7 @@ import static org.hamcrest.junit.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; @@ -35,6 +36,8 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import java.util.stream.LongStream; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.neo4j.driver.Config; @@ -46,6 +49,8 @@ import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; +import org.neo4j.driver.exceptions.TransactionTerminatedException; +import org.neo4j.driver.internal.InternalTransaction; import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory; import org.neo4j.driver.testutil.ParallelizableIT; @@ -434,6 +439,40 @@ void shouldRollbackWhenOneOfQueriesFails() { assertEquals(0, countNodesByLabel("Node4")); } + @Test + void shouldTerminateTransactionAndRejectSubsequentRuns() { + // Given + var tx = (InternalTransaction) session.beginTransaction(); + var result = tx.run("UNWIND range(0, 5) AS x RETURN x"); + result.next(); + + // When + tx.terminate(); + + // Then + assertThrows(TransactionTerminatedException.class, () -> tx.run("UNWIND range(0, 5) AS x RETURN x")); + tx.close(); + } + + // The driver must handle the Neo.ClientError.Transaction.Terminated failure. If the failure does not arrive, it + // must prevent further PULLs from being dispatched and must surface a driver generated error. The already received + // records should be accessible. + @RepeatedTest(5) + void shouldTerminateTransaction() { + // Given + var tx = (InternalTransaction) session.beginTransaction(); + var fetchSize = Config.defaultConfig().fetchSize(); + var result = tx.run("UNWIND range(0, $limit) AS x RETURN x", Map.of("limit", fetchSize)); + + // When + tx.terminate(); + + // Then + assertThrows(TransactionTerminatedException.class, () -> LongStream.range(0, fetchSize + 1) + .forEach(ignored -> assertNotNull(result.next()))); + tx.close(); + } + private void shouldRunAndCloseAfterAction(Consumer txConsumer, boolean isCommit) { // When try (Transaction tx = session.beginTransaction()) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index 658755e1d3..2098f85788 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -71,6 +71,7 @@ import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.exceptions.TransactionTerminatedException; import org.neo4j.driver.internal.FailableCursor; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.messaging.BoltProtocol; @@ -432,34 +433,34 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt } @Test - void shouldInterruptOnInterruptAsync() { + void shouldInterruptOnTerminateAsync() { // Given Connection connection = connectionMock(BoltProtocolV4.INSTANCE); UnmanagedTransaction tx = beginTx(connection); // When - await(tx.interruptAsync()); + await(tx.terminateAsync()); // Then - then(connection).should().reset(); + then(connection).should().reset(any(TransactionTerminatedException.class)); } @Test - void shouldServeTheSameStageOnInterruptAsync() { + void shouldServeTheSameStageOnTerminateAsync() { // Given Connection connection = connectionMock(BoltProtocolV4.INSTANCE); UnmanagedTransaction tx = beginTx(connection); // When - CompletionStage stage0 = tx.interruptAsync(); - CompletionStage stage1 = tx.interruptAsync(); + CompletionStage stage0 = tx.terminateAsync(); + CompletionStage stage1 = tx.terminateAsync(); // Then assertEquals(stage0, stage1); } @Test - void shouldHandleInterruptionWhenAlreadyInterrupted() throws ExecutionException, InterruptedException { + void shouldHandleTerminationWhenAlreadyTerminated() throws ExecutionException, InterruptedException { // Given var connection = connectionMock(BoltProtocolV4.INSTANCE); var exception = new Neo4jException("message"); @@ -473,7 +474,7 @@ void shouldHandleInterruptionWhenAlreadyInterrupted() throws ExecutionException, } catch (ExecutionException e) { actualException = e.getCause(); } - tx.interruptAsync().toCompletableFuture().get(); + tx.terminateAsync().toCompletableFuture().get(); // Then assertEquals(exception, actualException); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java index 227cd66e23..ebd081097d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java @@ -134,7 +134,7 @@ void shouldDelegateReset() { connection.reset(); - verify(mockConnection).reset(); + verify(mockConnection).reset(null); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java index a4ab8117d0..f62ba59490 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java @@ -35,33 +35,33 @@ public class InternalReactiveTransactionTest { @Test void shouldDelegateInterrupt() { // Given - UnmanagedTransaction utx = mock(UnmanagedTransaction.class); - given(utx.interruptAsync()).willReturn(completedFuture(null)); + var utx = mock(UnmanagedTransaction.class); + given(utx.terminateAsync()).willReturn(completedFuture(null)); tx = new InternalReactiveTransaction(utx); // When - StepVerifier.create(flowPublisherToFlux(tx.interrupt())) + StepVerifier.create(flowPublisherToFlux(tx.terminate())) .expectComplete() .verify(); // Then - then(utx).should().interruptAsync(); + then(utx).should().terminateAsync(); } @Test void shouldDelegateInterruptAndReportError() { // Given - UnmanagedTransaction utx = mock(UnmanagedTransaction.class); - RuntimeException e = mock(RuntimeException.class); - given(utx.interruptAsync()).willReturn(failedFuture(e)); + var utx = mock(UnmanagedTransaction.class); + var e = mock(RuntimeException.class); + given(utx.terminateAsync()).willReturn(failedFuture(e)); tx = new InternalReactiveTransaction(utx); // When - StepVerifier.create(flowPublisherToFlux(tx.interrupt())) + StepVerifier.create(flowPublisherToFlux(tx.terminate())) .expectErrorMatches(ar -> ar == e) .verify(); // Then - then(utx).should().interruptAsync(); + then(utx).should().terminateAsync(); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java index f3ed13395a..1387526fe8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java @@ -26,6 +26,7 @@ import org.neo4j.driver.AuthToken; import org.neo4j.driver.AuthTokenManager; import org.neo4j.driver.Config; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingContext; @@ -159,8 +160,8 @@ public void writeAndFlush( } @Override - public CompletionStage reset() { - return delegate.reset(); + public CompletionStage reset(Neo4jException error) { + return delegate.reset(error); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java index 516f1c918b..846065921e 100644 --- a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java @@ -73,6 +73,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.SessionConfig; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.NoOpBookmarkManager; @@ -467,6 +468,7 @@ public static Connection connectionMock(String databaseName, AccessMode mode, Bo setupSuccessResponse(connection, BeginMessage.class); when(connection.release()).thenReturn(completedWithNull()); when(connection.reset()).thenReturn(completedWithNull()); + when(connection.reset(any(Neo4jException.class))).thenReturn(completedWithNull()); } else { throw new IllegalArgumentException("Unsupported bolt protocol version: " + version); }