diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnection.java index 1c33772a14..044761a1d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnection.java @@ -183,14 +183,12 @@ public CompletionStage clear() { @Override public CompletionStage flush(ResponseHandler handler) { return delegate.flush(new ResponseHandler() { - private Throwable error; + boolean notifyHandler = true; @Override public void onError(Throwable throwable) { - if (error == null) { - error = handledError(throwable); - handler.onError(error); - } + handler.onError(handledError(throwable, notifyHandler)); + notifyHandler = false; } @Override @@ -306,34 +304,36 @@ public boolean telemetrySupported() { return delegate.telemetrySupported(); } - private Throwable handledError(Throwable receivedError) { + private Throwable handledError(Throwable receivedError, boolean notifyHandler) { var error = FutureUtil.completionExceptionCause(receivedError); - if (error instanceof ServiceUnavailableException) { - return handledServiceUnavailableException(((ServiceUnavailableException) error)); - } else if (error instanceof ClientException) { - return handledClientException(((ClientException) error)); - } else if (error instanceof TransientException) { - return handledTransientException(((TransientException) error)); + if (error instanceof ServiceUnavailableException exception) { + return handledServiceUnavailableException(exception, notifyHandler); + } else if (error instanceof ClientException exception) { + return handledClientException(exception, notifyHandler); + } else if (error instanceof TransientException exception) { + return handledTransientException(exception, notifyHandler); } else { return error; } } - private Throwable handledServiceUnavailableException(ServiceUnavailableException e) { - routingTableHandler.onConnectionFailure(serverAddress()); + private Throwable handledServiceUnavailableException(ServiceUnavailableException e, boolean notifyHandler) { + if (notifyHandler) { + routingTableHandler.onConnectionFailure(serverAddress()); + } return new SessionExpiredException(format("Server at %s is no longer available", serverAddress()), e); } - private Throwable handledTransientException(TransientException e) { + private Throwable handledTransientException(TransientException e, boolean notifyHandler) { var errorCode = e.code(); - if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable")) { + if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable") && notifyHandler) { routingTableHandler.onConnectionFailure(serverAddress()); } return e; } - private Throwable handledClientException(ClientException e) { + private Throwable handledClientException(ClientException e, boolean notifyHandler) { if (isFailureToWrite(e)) { // The server is unaware of the session mode, so we have to implement this logic in the driver. // In the future, we might be able to move this logic to the server. @@ -349,7 +349,9 @@ private Throwable handledClientException(ClientException e) { null); } case WRITE -> { - routingTableHandler.onWriteFailure(serverAddress()); + if (notifyHandler) { + routingTableHandler.onWriteFailure(serverAddress()); + } return new SessionExpiredException( format("Server at %s no longer accepts writes", serverAddress())); }