diff --git a/driver/src/main/java/org/neo4j/driver/exceptions/TransactionTerminatedException.java b/driver/src/main/java/org/neo4j/driver/exceptions/TransactionTerminatedException.java
new file mode 100644
index 0000000000..71ab205efb
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/exceptions/TransactionTerminatedException.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.exceptions;
+
+import java.io.Serial;
+
+/**
+ * Indicates that the transaction has been terminated.
+ * @since 5.10
+ */
+public class TransactionTerminatedException extends ClientException {
+ @Serial
+ private static final long serialVersionUID = 7639191706067500206L;
+
+ /**
+ * Creates a new instance.
+ * @param message the message
+ */
+ public TransactionTerminatedException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a new instance.
+ * @param code the code
+ * @param message the message
+ */
+ public TransactionTerminatedException(String code, String message) {
+ super(code, message);
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
index 37e7b3a6ef..5af91e1a71 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
@@ -66,6 +66,22 @@ public boolean isOpen() {
return tx.isOpen();
}
+ /**
+ * THIS IS A PRIVATE API
+ *
+ * Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
+ * transaction has not already been terminated, is not closed or closing.
+ *
+ * @since 5.10
+ * @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing
+ * @see org.neo4j.driver.exceptions.TransactionTerminatedException
+ */
+ public void terminate() {
+ Futures.blockingGet(
+ tx.terminateAsync(),
+ () -> terminateConnectionOnThreadInterrupt("Thread interrupted while terminating the transaction"));
+ }
+
private void terminateConnectionOnThreadInterrupt(String reason) {
tx.connection().terminateAndRelease(reason);
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java
index 5e4fb4883d..e96962ef13 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java
@@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
@@ -146,9 +147,9 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
}
@Override
- public CompletionStage reset() {
+ public CompletionStage reset(Neo4jException error) {
CompletableFuture result = new CompletableFuture<>();
- ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result);
+ ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result, error);
writeResetMessageIfNeeded(handler, true);
return result;
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
index f57df5218e..b4c3e97ac0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
@@ -43,6 +43,7 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransactionNestingException;
+import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.FailableCursor;
@@ -175,7 +176,8 @@ public CompletionStage resetAsync() {
return existingTransactionOrNull()
.thenAccept(tx -> {
if (tx != null) {
- tx.markTerminated(null);
+ tx.markTerminated(new TransactionTerminatedException(
+ "The transaction has been explicitly terminated by the driver"));
}
})
.thenCompose(ignore -> connectionStage)
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
index 494f845bfa..ff82aa6602 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.async;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.asCompletionException;
import static org.neo4j.driver.internal.util.Futures.combineErrors;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -44,6 +45,7 @@
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
+import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursor;
@@ -210,10 +212,14 @@ private void ensureCanRunQueries() {
} else if (state == State.ROLLED_BACK) {
throw new ClientException("Cannot run more queries in this transaction, it has been rolled back");
} else if (state == State.TERMINATED) {
- throw new ClientException(
- "Cannot run more queries in this transaction, "
- + "it has either experienced an fatal error or was explicitly terminated",
- causeOfTermination);
+ if (causeOfTermination instanceof TransactionTerminatedException transactionTerminatedException) {
+ throw transactionTerminatedException;
+ } else {
+ throw new ClientException(
+ "Cannot run more queries in this transaction, "
+ + "it has either experienced an fatal error or was explicitly terminated",
+ causeOfTermination);
+ }
}
});
}
@@ -319,20 +325,21 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul
return stage;
}
- /**
- * Marks transaction as terminated and sends {@code RESET} message over allocated connection.
- *
- * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.
- *
- * @return {@code RESET} response stage
- */
- public CompletionStage interruptAsync() {
+ public CompletionStage terminateAsync() {
return executeWithLock(lock, () -> {
- if (interruptStage == null) {
- markTerminated(null);
- interruptStage = connection.reset();
+ if (!isOpen() || commitFuture != null || rollbackFuture != null) {
+ return failedFuture(new ClientException("Can't terminate closed or closing transaction"));
+ } else {
+ if (state == State.TERMINATED) {
+ return interruptStage != null ? interruptStage : completedFuture(null);
+ } else {
+ var error = new TransactionTerminatedException(
+ "The transaction has been explicitly terminated by the driver");
+ markTerminated(error);
+ interruptStage = connection.reset(error);
+ return interruptStage;
+ }
}
- return interruptStage;
});
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java
index 010618b4a8..0474466c3f 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java
@@ -20,6 +20,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.DirectConnectionProvider;
@@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
}
@Override
- public CompletionStage reset() {
- return delegate.reset();
+ public CompletionStage reset(Neo4jException error) {
+ return delegate.reset(error);
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java
index fec880ace0..35bb0ceb9c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java
@@ -20,6 +20,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.RoutingErrorHandler;
@@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
}
@Override
- public CompletionStage reset() {
- return delegate.reset();
+ public CompletionStage reset(Neo4jException error) {
+ return delegate.reset(error);
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java
index 0960460ed7..d270258d5e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java
@@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
@@ -36,6 +37,7 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TokenExpiredException;
import org.neo4j.driver.exceptions.TokenExpiredRetryableException;
+import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.logging.ChannelErrorLogger;
@@ -43,6 +45,7 @@
import org.neo4j.driver.internal.security.StaticAuthTokenManager;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ErrorUtil;
+import org.neo4j.driver.internal.value.BooleanValue;
public class InboundMessageDispatcher implements ResponseMessageHandler {
private final Channel channel;
@@ -88,7 +91,19 @@ public void handleSuccessMessage(Map meta) {
log.debug("S: SUCCESS %s", meta);
invokeBeforeLastHandlerHook(HandlerHook.MessageType.SUCCESS);
ResponseHandler handler = removeHandler();
- handler.onSuccess(meta);
+ getResetResponseHandler()
+ .flatMap(ResetResponseHandler::error)
+ .ifPresentOrElse(
+ error -> {
+ if (handler instanceof PullAllResponseHandler
+ && meta.getOrDefault("has_more", BooleanValue.FALSE)
+ .asBoolean()) {
+ handler.onFailure(error);
+ } else {
+ handler.onSuccess(meta);
+ }
+ },
+ () -> handler.onSuccess(meta));
}
@Override
@@ -146,22 +161,32 @@ public void handleFailureMessage(String code, String message) {
public void handleIgnoredMessage() {
log.debug("S: IGNORED");
- ResponseHandler handler = removeHandler();
+ var handler = removeHandler();
Throwable error;
if (currentError != null) {
error = currentError;
} else {
- log.warn(
- "Received IGNORED message for handler %s but error is missing and RESET is not in progress. "
- + "Current handlers %s",
- handler, handlers);
-
- error = new ClientException("Database ignored the request");
+ var resetHandlerOpt = getResetResponseHandler();
+ if (resetHandlerOpt.isEmpty()) {
+ log.warn(
+ "Received IGNORED message for handler %s but error is missing and RESET is not in progress. Current handlers %s",
+ handler, handlers);
+ }
+ error = resetHandlerOpt
+ .flatMap(ResetResponseHandler::error)
+ .orElseGet(() -> new ClientException("Database ignored the request"));
}
handler.onFailure(error);
}
+ private Optional getResetResponseHandler() {
+ return handlers.stream()
+ .filter(nextHandler -> nextHandler instanceof ResetResponseHandler)
+ .map(nextHandler -> (ResetResponseHandler) nextHandler)
+ .findFirst();
+ }
+
public void handleChannelInactive(Throwable cause) {
// report issue if the connection has not been terminated as a result of a graceful shutdown request from its
// parent pool
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java
index 82cd04d207..db8fa27777 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java
@@ -19,22 +19,33 @@
package org.neo4j.driver.internal.handlers;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.Value;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.spi.ResponseHandler;
public class ResetResponseHandler implements ResponseHandler {
private final InboundMessageDispatcher messageDispatcher;
private final CompletableFuture completionFuture;
+ private final Neo4jException error;
public ResetResponseHandler(InboundMessageDispatcher messageDispatcher) {
this(messageDispatcher, null);
}
public ResetResponseHandler(InboundMessageDispatcher messageDispatcher, CompletableFuture completionFuture) {
+ this(messageDispatcher, completionFuture, null);
+ }
+
+ public ResetResponseHandler(
+ InboundMessageDispatcher messageDispatcher,
+ CompletableFuture completionFuture,
+ Neo4jException error) {
this.messageDispatcher = messageDispatcher;
this.completionFuture = completionFuture;
+ this.error = error;
}
@Override
@@ -52,6 +63,10 @@ public final void onRecord(Value[] fields) {
throw new UnsupportedOperationException();
}
+ public Optional error() {
+ return Optional.ofNullable(error);
+ }
+
private void resetCompleted(boolean success) {
messageDispatcher.clearCurrentError();
if (completionFuture != null) {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java
index c5a089cded..b8d8ccb70c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java
@@ -61,14 +61,16 @@ public Publisher run(Query query) {
}
/**
- * Marks transaction as terminated and sends {@code RESET} message over allocated connection.
+ * THIS IS A PRIVATE API
*
- * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.
+ * Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
+ * transaction has not already been terminated, is not closed or closing.
*
- * @return {@code RESET} response publisher
+ * @return completion publisher (the {@code RESET} completion publisher if the message was sent)
+ * @since 5.10
*/
- public Publisher interrupt() {
- return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync()));
+ public Publisher terminate() {
+ return publisherToFlowPublisher(Mono.fromCompletionStage(tx.terminateAsync()));
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java
index b91419e6cf..01dabf8e8a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java
@@ -60,14 +60,16 @@ public Publisher run(Query query) {
}
/**
- * Marks transaction as terminated and sends {@code RESET} message over allocated connection.
+ * THIS IS A PRIVATE API
*
- * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.
+ * Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
+ * transaction has not already been terminated, is not closed or closing.
*
- * @return {@code RESET} response publisher
+ * @return completion publisher (the {@code RESET} completion publisher if the message was sent)
+ * @since 5.10
*/
- public Publisher interrupt() {
- return Mono.fromCompletionStage(tx.interruptAsync());
+ public Publisher terminate() {
+ return Mono.fromCompletionStage(tx.terminateAsync());
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java
index 54fe232953..7fa37cc317 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java
@@ -22,6 +22,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -42,7 +43,11 @@ public interface Connection {
void writeAndFlush(Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2);
- CompletionStage reset();
+ default CompletionStage reset() {
+ return reset(null);
+ }
+
+ CompletionStage reset(Neo4jException error);
CompletionStage release();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java
index 4a2f2a18f5..615cc93ef6 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java
@@ -32,6 +32,7 @@
import org.neo4j.driver.exceptions.SecurityException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.TokenExpiredException;
+import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.exceptions.TransientException;
public final class ErrorUtil {
@@ -72,6 +73,8 @@ public static Neo4jException newNeo4jError(String code, String message) {
} else {
if (code.equalsIgnoreCase("Neo.ClientError.Database.DatabaseNotFound")) {
return new FatalDiscoveryException(code, message);
+ } else if (code.equalsIgnoreCase("Neo.ClientError.Transaction.Terminated")) {
+ return new TransactionTerminatedException(code, message);
} else {
return new ClientException(code, message);
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java
index 64b9e8b372..ea9cb1c399 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java
@@ -25,6 +25,7 @@
import static org.hamcrest.junit.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
@@ -35,6 +36,8 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import java.util.stream.LongStream;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Config;
@@ -46,6 +49,8 @@
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
+import org.neo4j.driver.exceptions.TransactionTerminatedException;
+import org.neo4j.driver.internal.InternalTransaction;
import org.neo4j.driver.internal.security.SecurityPlanImpl;
import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory;
import org.neo4j.driver.testutil.ParallelizableIT;
@@ -434,6 +439,40 @@ void shouldRollbackWhenOneOfQueriesFails() {
assertEquals(0, countNodesByLabel("Node4"));
}
+ @Test
+ void shouldTerminateTransactionAndRejectSubsequentRuns() {
+ // Given
+ var tx = (InternalTransaction) session.beginTransaction();
+ var result = tx.run("UNWIND range(0, 5) AS x RETURN x");
+ result.next();
+
+ // When
+ tx.terminate();
+
+ // Then
+ assertThrows(TransactionTerminatedException.class, () -> tx.run("UNWIND range(0, 5) AS x RETURN x"));
+ tx.close();
+ }
+
+ // The driver must handle the Neo.ClientError.Transaction.Terminated failure. If the failure does not arrive, it
+ // must prevent further PULLs from being dispatched and must surface a driver generated error. The already received
+ // records should be accessible.
+ @RepeatedTest(5)
+ void shouldTerminateTransaction() {
+ // Given
+ var tx = (InternalTransaction) session.beginTransaction();
+ var fetchSize = Config.defaultConfig().fetchSize();
+ var result = tx.run("UNWIND range(0, $limit) AS x RETURN x", Map.of("limit", fetchSize));
+
+ // When
+ tx.terminate();
+
+ // Then
+ assertThrows(TransactionTerminatedException.class, () -> LongStream.range(0, fetchSize + 1)
+ .forEach(ignored -> assertNotNull(result.next())));
+ tx.close();
+ }
+
private void shouldRunAndCloseAfterAction(Consumer txConsumer, boolean isCommit) {
// When
try (Transaction tx = session.beginTransaction()) {
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java
index 658755e1d3..2098f85788 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java
@@ -71,6 +71,7 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.exceptions.Neo4jException;
+import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -432,34 +433,34 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt
}
@Test
- void shouldInterruptOnInterruptAsync() {
+ void shouldInterruptOnTerminateAsync() {
// Given
Connection connection = connectionMock(BoltProtocolV4.INSTANCE);
UnmanagedTransaction tx = beginTx(connection);
// When
- await(tx.interruptAsync());
+ await(tx.terminateAsync());
// Then
- then(connection).should().reset();
+ then(connection).should().reset(any(TransactionTerminatedException.class));
}
@Test
- void shouldServeTheSameStageOnInterruptAsync() {
+ void shouldServeTheSameStageOnTerminateAsync() {
// Given
Connection connection = connectionMock(BoltProtocolV4.INSTANCE);
UnmanagedTransaction tx = beginTx(connection);
// When
- CompletionStage stage0 = tx.interruptAsync();
- CompletionStage stage1 = tx.interruptAsync();
+ CompletionStage stage0 = tx.terminateAsync();
+ CompletionStage stage1 = tx.terminateAsync();
// Then
assertEquals(stage0, stage1);
}
@Test
- void shouldHandleInterruptionWhenAlreadyInterrupted() throws ExecutionException, InterruptedException {
+ void shouldHandleTerminationWhenAlreadyTerminated() throws ExecutionException, InterruptedException {
// Given
var connection = connectionMock(BoltProtocolV4.INSTANCE);
var exception = new Neo4jException("message");
@@ -473,7 +474,7 @@ void shouldHandleInterruptionWhenAlreadyInterrupted() throws ExecutionException,
} catch (ExecutionException e) {
actualException = e.getCause();
}
- tx.interruptAsync().toCompletableFuture().get();
+ tx.terminateAsync().toCompletableFuture().get();
// Then
assertEquals(exception, actualException);
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java
index 227cd66e23..ebd081097d 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/connection/DecoratedConnectionTest.java
@@ -134,7 +134,7 @@ void shouldDelegateReset() {
connection.reset();
- verify(mockConnection).reset();
+ verify(mockConnection).reset(null);
}
@Test
diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java
index a4ab8117d0..f62ba59490 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java
@@ -35,33 +35,33 @@ public class InternalReactiveTransactionTest {
@Test
void shouldDelegateInterrupt() {
// Given
- UnmanagedTransaction utx = mock(UnmanagedTransaction.class);
- given(utx.interruptAsync()).willReturn(completedFuture(null));
+ var utx = mock(UnmanagedTransaction.class);
+ given(utx.terminateAsync()).willReturn(completedFuture(null));
tx = new InternalReactiveTransaction(utx);
// When
- StepVerifier.create(flowPublisherToFlux(tx.interrupt()))
+ StepVerifier.create(flowPublisherToFlux(tx.terminate()))
.expectComplete()
.verify();
// Then
- then(utx).should().interruptAsync();
+ then(utx).should().terminateAsync();
}
@Test
void shouldDelegateInterruptAndReportError() {
// Given
- UnmanagedTransaction utx = mock(UnmanagedTransaction.class);
- RuntimeException e = mock(RuntimeException.class);
- given(utx.interruptAsync()).willReturn(failedFuture(e));
+ var utx = mock(UnmanagedTransaction.class);
+ var e = mock(RuntimeException.class);
+ given(utx.terminateAsync()).willReturn(failedFuture(e));
tx = new InternalReactiveTransaction(utx);
// When
- StepVerifier.create(flowPublisherToFlux(tx.interrupt()))
+ StepVerifier.create(flowPublisherToFlux(tx.terminate()))
.expectErrorMatches(ar -> ar == e)
.verify();
// Then
- then(utx).should().interruptAsync();
+ then(utx).should().terminateAsync();
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java
index f3ed13395a..1387526fe8 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java
@@ -26,6 +26,7 @@
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.AuthTokenManager;
import org.neo4j.driver.Config;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DriverFactory;
import org.neo4j.driver.internal.cluster.RoutingContext;
@@ -159,8 +160,8 @@ public void writeAndFlush(
}
@Override
- public CompletionStage reset() {
- return delegate.reset();
+ public CompletionStage reset(Neo4jException error) {
+ return delegate.reset(error);
}
@Override
diff --git a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java
index 516f1c918b..846065921e 100644
--- a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java
+++ b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java
@@ -73,6 +73,7 @@
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.NoOpBookmarkManager;
@@ -467,6 +468,7 @@ public static Connection connectionMock(String databaseName, AccessMode mode, Bo
setupSuccessResponse(connection, BeginMessage.class);
when(connection.release()).thenReturn(completedWithNull());
when(connection.reset()).thenReturn(completedWithNull());
+ when(connection.reset(any(Neo4jException.class))).thenReturn(completedWithNull());
} else {
throw new IllegalArgumentException("Unsupported bolt protocol version: " + version);
}