Skip to content

Commit 8ce5344

Browse files
committed
Update handling of cancellation state in reactive result
1 parent 5a7afbb commit 8ce5344

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
199199
public CompletionStage<RxResultCursor> runRx(
200200
Query query, TransactionConfig config, CompletionStage<RxResultCursor> cursorPublishStage) {
201201
ensureSessionIsOpen();
202-
return ensureNoOpenTxBeforeRunningQuery()
202+
var newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
203203
.thenCompose(ignore -> acquireConnection(mode))
204204
.thenCompose(connection -> {
205205
var parameters = query.parameters().asMap(Values::value);
@@ -244,9 +244,12 @@ public CompletionStage<RxResultCursor> runRx(
244244
}
245245
})
246246
.thenCompose(Function.identity());
247-
resultCursorStage = cursorStage.exceptionally(error -> null);
248247
return cursorStage.thenApply(Function.identity());
249248
});
249+
resultCursorStage = newResultCursorStage
250+
.thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage)
251+
.exceptionally(throwable -> null);
252+
return newResultCursorStage;
250253
}
251254

252255
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,18 @@ public CompletionStage<ResultSummary> summaryAsync() {
288288

289289
@Override
290290
public CompletionStage<Void> rollback() {
291-
log.trace("[%d] Rolling back unpublished result", hashCode());
292291
synchronized (this) {
293-
state = State.SUCCEEDED;
292+
log.trace("[%d] Rolling back unpublished result %s state", hashCode(), state);
293+
switch (state) {
294+
case READY -> state = State.SUCCEEDED;
295+
case STREAMING, DISCARDING -> {
296+
return summaryFuture.thenApply(ignored -> null);
297+
}
298+
case FAILED, SUCCEEDED -> {
299+
return CompletableFuture.completedFuture(null);
300+
}
301+
}
294302
}
295-
completeSummaryFuture(null, null);
296303
var resetFuture = new CompletableFuture<Void>();
297304
boltConnection
298305
.reset()
@@ -319,14 +326,17 @@ public void onComplete() {
319326
resetFuture.completeExceptionally(throwable);
320327
}
321328
});
322-
return resetFuture.thenCompose(ignored -> boltConnection.close()).exceptionally(throwable -> null);
329+
return resetFuture
330+
.thenCompose(ignored -> boltConnection.close())
331+
.whenComplete((ignored, throwable) -> completeSummaryFuture(null, null))
332+
.exceptionally(throwable -> null);
323333
}
324334

325335
@Override
326336
public void onComplete() {
327-
log.trace("[%d] onComplete", hashCode());
328337
Runnable runnable;
329338
synchronized (this) {
339+
log.trace("[%d] onComplete", hashCode());
330340
var throwable = interruptSupplier.get();
331341
if (throwable != null) {
332342
handleError(throwable, true);

0 commit comments

Comments
 (0)