Skip to content

Commit 61dc8a3

Browse files
committed
Improve transaction failure handling
If Bolt exchange fails for whatever reason, subsequent runs should emit a failed transaction error.
1 parent dc3c79d commit 61dc8a3

File tree

4 files changed

+34
-19
lines changed

4 files changed

+34
-19
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareResponseHandler.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ final class TerminationAwareResponseHandler extends DelegatingResponseHandler {
2828
private final Logger log;
2929
private final TerminationAwareStateLockingExecutor executor;
3030
private final Consumer<Throwable> throwableConsumer;
31+
private boolean hadError;
3132

3233
TerminationAwareResponseHandler(
3334
Logging logging,
@@ -42,6 +43,7 @@ final class TerminationAwareResponseHandler extends DelegatingResponseHandler {
4243

4344
@Override
4445
public void onError(Throwable throwable) {
46+
hadError = true;
4547
throwableConsumer.accept(Futures.completionExceptionCause(throwable));
4648
super.onError(throwable);
4749
}
@@ -50,10 +52,16 @@ public void onError(Throwable throwable) {
5052
public void onComplete() {
5153
var throwable = executor.execute(Function.identity());
5254
if (throwable != null) {
53-
log.trace(
54-
"Reporting an existing %s error to delegate",
55-
throwable.getClass().getCanonicalName());
56-
delegate.onError(throwable);
55+
if (hadError) {
56+
log.trace(
57+
"Skipping reporting %s error because an actual error has already been reported",
58+
throwable.getClass().getCanonicalName());
59+
} else {
60+
log.trace(
61+
"Reporting an existing %s error to delegate",
62+
throwable.getClass().getCanonicalName());
63+
delegate.onError(throwable);
64+
}
5765
}
5866
log.trace("Completing delegate");
5967
delegate.onComplete();

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,10 @@ public CompletionStage<Void> terminateAsync() {
320320

321321
@Override
322322
public <T> T execute(Function<Throwable, T> causeOfTerminationConsumer) {
323-
return executeWithLock(lock, () -> causeOfTerminationConsumer.apply(causeOfTermination));
323+
return executeWithLock(lock, () -> {
324+
var throwable = causeOfTermination == null ? null : failedTxException(causeOfTermination);
325+
return causeOfTerminationConsumer.apply(throwable);
326+
});
324327
}
325328

326329
private void ensureCanRunQueries() {
@@ -347,15 +350,7 @@ private void ensureCanRunQueries() {
347350
if (causeOfTermination instanceof TransactionTerminatedException transactionTerminatedException) {
348351
throw transactionTerminatedException;
349352
} else {
350-
var message =
351-
"Cannot run more queries in this transaction, it has either experienced an fatal error or was explicitly terminated";
352-
throw new TransactionTerminatedException(
353-
GqlStatusError.UNKNOWN.getStatus(),
354-
GqlStatusError.UNKNOWN.getStatusDescription(message),
355-
"N/A",
356-
message,
357-
GqlStatusError.DIAGNOSTIC_RECORD,
358-
causeOfTermination);
353+
throw failedTxException(causeOfTermination);
359354
}
360355
} else if (commitFuture != null) {
361356
var message = "Cannot run more queries in this transaction, it is being committed";
@@ -578,6 +573,18 @@ private CompletionStage<Void> closeAsync(boolean commit, boolean completeWithNul
578573
return stage;
579574
}
580575

576+
private static TransactionTerminatedException failedTxException(Throwable cause) {
577+
var message =
578+
"Cannot run more queries in this transaction, it has either experienced a fatal error or was explicitly terminated";
579+
return new TransactionTerminatedException(
580+
GqlStatusError.UNKNOWN.getStatus(),
581+
GqlStatusError.UNKNOWN.getStatusDescription(message),
582+
"N/A",
583+
message,
584+
GqlStatusError.DIAGNOSTIC_RECORD,
585+
cause);
586+
}
587+
581588
private static class BeginResponseHandler implements DriverResponseHandler {
582589
final CompletableFuture<UnmanagedTransaction> summaryFuture = new CompletableFuture<>();
583590
private final ApiTelemetryWork apiTelemetryWork;

driver/src/test/java/org/neo4j/driver/integration/TransactionIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ void shouldPreventPullAfterTransactionTermination(boolean iterate) {
422422
result.list();
423423
}
424424
});
425-
assertEquals(terminationException, exception);
425+
assertEquals(terminationException, exception.getCause());
426426
}
427427
tx.close();
428428
}
@@ -441,8 +441,8 @@ void shouldPreventDiscardAfterTransactionTermination() {
441441

442442
// Then
443443
for (var result : List.of(result0, result1)) {
444-
var exception = assertThrows(ClientException.class, result::consume);
445-
assertEquals(terminationException, exception);
444+
var exception = assertThrows(TransactionTerminatedException.class, result::consume);
445+
assertEquals(terminationException, exception.getCause());
446446
}
447447
tx.close();
448448
}

driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveTransactionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ void shouldPreventPullAfterTransactionTermination() {
6363
for (var result : List.of(result0, result1)) {
6464
var exception = assertThrows(
6565
ClientException.class, () -> Flux.from(result.records()).blockFirst());
66-
assertEquals(terminationException, exception);
66+
assertEquals(terminationException, exception.getCause());
6767
}
6868
Mono.fromDirect(tx.close()).block();
6969
}
@@ -92,7 +92,7 @@ void shouldPreventDiscardAfterTransactionTermination() {
9292
for (var result : List.of(result0, result1)) {
9393
var exception = assertThrows(ClientException.class, () -> Mono.fromDirect(result.consume())
9494
.block());
95-
assertEquals(terminationException, exception);
95+
assertEquals(terminationException, exception.getCause());
9696
}
9797
Mono.fromDirect(tx.close()).block();
9898
}

0 commit comments

Comments
 (0)