Skip to content

Notify handler of all errors in RoutedBoltConnection #1588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,12 @@ public CompletionStage<BoltConnection> clear() {
@Override
public CompletionStage<Void> flush(ResponseHandler handler) {
return delegate.flush(new ResponseHandler() {
private Throwable error;
boolean notifyHandler = true;

@Override
public void onError(Throwable throwable) {
if (error == null) {
error = handledError(throwable);
handler.onError(error);
}
handler.onError(handledError(throwable, notifyHandler));
notifyHandler = false;
}

@Override
Expand Down Expand Up @@ -306,34 +304,36 @@ public boolean telemetrySupported() {
return delegate.telemetrySupported();
}

private Throwable handledError(Throwable receivedError) {
private Throwable handledError(Throwable receivedError, boolean notifyHandler) {
var error = FutureUtil.completionExceptionCause(receivedError);

if (error instanceof ServiceUnavailableException) {
return handledServiceUnavailableException(((ServiceUnavailableException) error));
} else if (error instanceof ClientException) {
return handledClientException(((ClientException) error));
} else if (error instanceof TransientException) {
return handledTransientException(((TransientException) error));
if (error instanceof ServiceUnavailableException exception) {
return handledServiceUnavailableException(exception, notifyHandler);
} else if (error instanceof ClientException exception) {
return handledClientException(exception, notifyHandler);
} else if (error instanceof TransientException exception) {
return handledTransientException(exception, notifyHandler);
} else {
return error;
}
}

private Throwable handledServiceUnavailableException(ServiceUnavailableException e) {
routingTableHandler.onConnectionFailure(serverAddress());
private Throwable handledServiceUnavailableException(ServiceUnavailableException e, boolean notifyHandler) {
if (notifyHandler) {
routingTableHandler.onConnectionFailure(serverAddress());
}
return new SessionExpiredException(format("Server at %s is no longer available", serverAddress()), e);
}

private Throwable handledTransientException(TransientException e) {
private Throwable handledTransientException(TransientException e, boolean notifyHandler) {
var errorCode = e.code();
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable")) {
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable") && notifyHandler) {
routingTableHandler.onConnectionFailure(serverAddress());
}
return e;
}

private Throwable handledClientException(ClientException e) {
private Throwable handledClientException(ClientException e, boolean notifyHandler) {
if (isFailureToWrite(e)) {
// The server is unaware of the session mode, so we have to implement this logic in the driver.
// In the future, we might be able to move this logic to the server.
Expand All @@ -349,7 +349,9 @@ private Throwable handledClientException(ClientException e) {
null);
}
case WRITE -> {
routingTableHandler.onWriteFailure(serverAddress());
if (notifyHandler) {
routingTableHandler.onWriteFailure(serverAddress());
}
return new SessionExpiredException(
format("Server at %s no longer accepts writes", serverAddress()));
}
Expand Down