Skip to content

Commit f6cd08c

Browse files
authored
Notify handler of all errors in RoutedBoltConnection (#1588)
1 parent 15d498b commit f6cd08c

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

driver/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnection.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,12 @@ public CompletionStage<BoltConnection> clear() {
183183
@Override
184184
public CompletionStage<Void> flush(ResponseHandler handler) {
185185
return delegate.flush(new ResponseHandler() {
186-
private Throwable error;
186+
boolean notifyHandler = true;
187187

188188
@Override
189189
public void onError(Throwable throwable) {
190-
if (error == null) {
191-
error = handledError(throwable);
192-
handler.onError(error);
193-
}
190+
handler.onError(handledError(throwable, notifyHandler));
191+
notifyHandler = false;
194192
}
195193

196194
@Override
@@ -306,34 +304,36 @@ public boolean telemetrySupported() {
306304
return delegate.telemetrySupported();
307305
}
308306

309-
private Throwable handledError(Throwable receivedError) {
307+
private Throwable handledError(Throwable receivedError, boolean notifyHandler) {
310308
var error = FutureUtil.completionExceptionCause(receivedError);
311309

312-
if (error instanceof ServiceUnavailableException) {
313-
return handledServiceUnavailableException(((ServiceUnavailableException) error));
314-
} else if (error instanceof ClientException) {
315-
return handledClientException(((ClientException) error));
316-
} else if (error instanceof TransientException) {
317-
return handledTransientException(((TransientException) error));
310+
if (error instanceof ServiceUnavailableException exception) {
311+
return handledServiceUnavailableException(exception, notifyHandler);
312+
} else if (error instanceof ClientException exception) {
313+
return handledClientException(exception, notifyHandler);
314+
} else if (error instanceof TransientException exception) {
315+
return handledTransientException(exception, notifyHandler);
318316
} else {
319317
return error;
320318
}
321319
}
322320

323-
private Throwable handledServiceUnavailableException(ServiceUnavailableException e) {
324-
routingTableHandler.onConnectionFailure(serverAddress());
321+
private Throwable handledServiceUnavailableException(ServiceUnavailableException e, boolean notifyHandler) {
322+
if (notifyHandler) {
323+
routingTableHandler.onConnectionFailure(serverAddress());
324+
}
325325
return new SessionExpiredException(format("Server at %s is no longer available", serverAddress()), e);
326326
}
327327

328-
private Throwable handledTransientException(TransientException e) {
328+
private Throwable handledTransientException(TransientException e, boolean notifyHandler) {
329329
var errorCode = e.code();
330-
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable")) {
330+
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable") && notifyHandler) {
331331
routingTableHandler.onConnectionFailure(serverAddress());
332332
}
333333
return e;
334334
}
335335

336-
private Throwable handledClientException(ClientException e) {
336+
private Throwable handledClientException(ClientException e, boolean notifyHandler) {
337337
if (isFailureToWrite(e)) {
338338
// The server is unaware of the session mode, so we have to implement this logic in the driver.
339339
// In the future, we might be able to move this logic to the server.
@@ -349,7 +349,9 @@ private Throwable handledClientException(ClientException e) {
349349
null);
350350
}
351351
case WRITE -> {
352-
routingTableHandler.onWriteFailure(serverAddress());
352+
if (notifyHandler) {
353+
routingTableHandler.onWriteFailure(serverAddress());
354+
}
353355
return new SessionExpiredException(
354356
format("Server at %s no longer accepts writes", serverAddress()));
355357
}

0 commit comments

Comments
 (0)