Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.exceptions;

import java.io.Serial;

/**
* Indicates that the transaction has been terminated.
* @since 5.10
*/
public class TransactionTerminatedException extends ClientException {
@Serial
private static final long serialVersionUID = 7639191706067500206L;

/**
* Creates a new instance.
* @param message the message
*/
public TransactionTerminatedException(String message) {
super(message);
}

/**
* Creates a new instance.
* @param code the code
* @param message the message
*/
public TransactionTerminatedException(String code, String message) {
super(code, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ public boolean isOpen() {
return tx.isOpen();
}

/**
* <b>THIS IS A PRIVATE API</b>
* <p>
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
* transaction has not already been terminated, is not closed or closing.
*
* @since 5.10
* @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing
* @see org.neo4j.driver.exceptions.TransactionTerminatedException
*/
public void terminate() {
Futures.blockingGet(
tx.terminateAsync(),
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while terminating the transaction"));
}

private void terminateConnectionOnThreadInterrupt(String reason) {
tx.connection().terminateAndRelease(reason);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
Expand Down Expand Up @@ -146,9 +147,9 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
}

@Override
public CompletionStage<Void> reset() {
public CompletionStage<Void> reset(Neo4jException error) {
CompletableFuture<Void> result = new CompletableFuture<>();
ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result);
ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result, error);
writeResetMessageIfNeeded(handler, true);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.FailableCursor;
Expand Down Expand Up @@ -175,7 +176,8 @@ public CompletionStage<Void> resetAsync() {
return existingTransactionOrNull()
.thenAccept(tx -> {
if (tx != null) {
tx.markTerminated(null);
tx.markTerminated(new TransactionTerminatedException(
"The transaction has been explicitly terminated by the driver"));
}
})
.thenCompose(ignore -> connectionStage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.async;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.asCompletionException;
import static org.neo4j.driver.internal.util.Futures.combineErrors;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
Expand All @@ -44,6 +45,7 @@
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.exceptions.TransactionTerminatedException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursor;
Expand Down Expand Up @@ -210,10 +212,14 @@ private void ensureCanRunQueries() {
} else if (state == State.ROLLED_BACK) {
throw new ClientException("Cannot run more queries in this transaction, it has been rolled back");
} else if (state == State.TERMINATED) {
throw new ClientException(
"Cannot run more queries in this transaction, "
+ "it has either experienced an fatal error or was explicitly terminated",
causeOfTermination);
if (causeOfTermination instanceof TransactionTerminatedException transactionTerminatedException) {
throw transactionTerminatedException;
} else {
throw new ClientException(
"Cannot run more queries in this transaction, "
+ "it has either experienced an fatal error or was explicitly terminated",
causeOfTermination);
}
}
});
}
Expand Down Expand Up @@ -319,20 +325,21 @@ private CompletionStage<Void> closeAsync(boolean commit, boolean completeWithNul
return stage;
}

/**
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
* <p>
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
*
* @return {@code RESET} response stage
*/
public CompletionStage<Void> interruptAsync() {
public CompletionStage<Void> terminateAsync() {
return executeWithLock(lock, () -> {
if (interruptStage == null) {
markTerminated(null);
interruptStage = connection.reset();
if (!isOpen() || commitFuture != null || rollbackFuture != null) {
return failedFuture(new ClientException("Can't terminate closed or closing transaction"));
} else {
if (state == State.TERMINATED) {
return interruptStage != null ? interruptStage : completedFuture(null);
} else {
var error = new TransactionTerminatedException(
"The transaction has been explicitly terminated by the driver");
markTerminated(error);
interruptStage = connection.reset(error);
return interruptStage;
}
}
return interruptStage;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.DirectConnectionProvider;
Expand Down Expand Up @@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
}

@Override
public CompletionStage<Void> reset() {
return delegate.reset();
public CompletionStage<Void> reset(Neo4jException error) {
return delegate.reset(error);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.RoutingErrorHandler;
Expand Down Expand Up @@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
}

@Override
public CompletionStage<Void> reset() {
return delegate.reset();
public CompletionStage<Void> reset(Neo4jException error) {
return delegate.reset(error);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
Expand All @@ -36,13 +37,15 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TokenExpiredException;
import org.neo4j.driver.exceptions.TokenExpiredRetryableException;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.logging.ChannelErrorLogger;
import org.neo4j.driver.internal.messaging.ResponseMessageHandler;
import org.neo4j.driver.internal.security.StaticAuthTokenManager;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.internal.value.BooleanValue;

public class InboundMessageDispatcher implements ResponseMessageHandler {
private final Channel channel;
Expand Down Expand Up @@ -88,7 +91,19 @@ public void handleSuccessMessage(Map<String, Value> meta) {
log.debug("S: SUCCESS %s", meta);
invokeBeforeLastHandlerHook(HandlerHook.MessageType.SUCCESS);
ResponseHandler handler = removeHandler();
handler.onSuccess(meta);
getResetResponseHandler()
.flatMap(ResetResponseHandler::error)
.ifPresentOrElse(
error -> {
if (handler instanceof PullAllResponseHandler
&& meta.getOrDefault("has_more", BooleanValue.FALSE)
.asBoolean()) {
handler.onFailure(error);
} else {
handler.onSuccess(meta);
}
},
() -> handler.onSuccess(meta));
}

@Override
Expand Down Expand Up @@ -146,22 +161,32 @@ public void handleFailureMessage(String code, String message) {
public void handleIgnoredMessage() {
log.debug("S: IGNORED");

ResponseHandler handler = removeHandler();
var handler = removeHandler();

Throwable error;
if (currentError != null) {
error = currentError;
} else {
log.warn(
"Received IGNORED message for handler %s but error is missing and RESET is not in progress. "
+ "Current handlers %s",
handler, handlers);

error = new ClientException("Database ignored the request");
var resetHandlerOpt = getResetResponseHandler();
if (resetHandlerOpt.isEmpty()) {
log.warn(
"Received IGNORED message for handler %s but error is missing and RESET is not in progress. Current handlers %s",
handler, handlers);
}
error = resetHandlerOpt
.flatMap(ResetResponseHandler::error)
.orElseGet(() -> new ClientException("Database ignored the request"));
}
handler.onFailure(error);
}

private Optional<ResetResponseHandler> getResetResponseHandler() {
return handlers.stream()
.filter(nextHandler -> nextHandler instanceof ResetResponseHandler)
.map(nextHandler -> (ResetResponseHandler) nextHandler)
.findFirst();
}

public void handleChannelInactive(Throwable cause) {
// report issue if the connection has not been terminated as a result of a graceful shutdown request from its
// parent pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,33 @@
package org.neo4j.driver.internal.handlers;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.spi.ResponseHandler;

public class ResetResponseHandler implements ResponseHandler {
private final InboundMessageDispatcher messageDispatcher;
private final CompletableFuture<Void> completionFuture;
private final Neo4jException error;

public ResetResponseHandler(InboundMessageDispatcher messageDispatcher) {
this(messageDispatcher, null);
}

public ResetResponseHandler(InboundMessageDispatcher messageDispatcher, CompletableFuture<Void> completionFuture) {
this(messageDispatcher, completionFuture, null);
}

public ResetResponseHandler(
InboundMessageDispatcher messageDispatcher,
CompletableFuture<Void> completionFuture,
Neo4jException error) {
this.messageDispatcher = messageDispatcher;
this.completionFuture = completionFuture;
this.error = error;
}

@Override
Expand All @@ -52,6 +63,10 @@ public final void onRecord(Value[] fields) {
throw new UnsupportedOperationException();
}

public Optional<Neo4jException> error() {
return Optional.ofNullable(error);
}

private void resetCompleted(boolean success) {
messageDispatcher.clearCurrentError();
if (completionFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ public Publisher<ReactiveResult> run(Query query) {
}

/**
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
* <b>THIS IS A PRIVATE API</b>
* <p>
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
* transaction has not already been terminated, is not closed or closing.
*
* @return {@code RESET} response publisher
* @return completion publisher (the {@code RESET} completion publisher if the message was sent)
* @since 5.10
*/
public Publisher<Void> interrupt() {
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync()));
public Publisher<Void> terminate() {
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.terminateAsync()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ public Publisher<ReactiveResult> run(Query query) {
}

/**
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
* <b>THIS IS A PRIVATE API</b>
* <p>
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
* transaction has not already been terminated, is not closed or closing.
*
* @return {@code RESET} response publisher
* @return completion publisher (the {@code RESET} completion publisher if the message was sent)
* @since 5.10
*/
public Publisher<Void> interrupt() {
return Mono.fromCompletionStage(tx.interruptAsync());
public Publisher<Void> terminate() {
return Mono.fromCompletionStage(tx.terminateAsync());
}

@Override
Expand Down
Loading