Skip to content

Commit 0b3a03c

Browse files
committed
Update RxResultCursorImpl
1 parent d70759c commit 0b3a03c

File tree

6 files changed

+659
-369
lines changed

6 files changed

+659
-369
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public CompletionStage<RxResultCursor> runRx(
207207
apiTelemetryWork.setEnabled(!telemetryDisabled);
208208
var runFailed = new AtomicBoolean(false);
209209
var responseHandler =
210-
new RunRxResponseHandler(connection, query, this::handleNewBookmark, runFailed);
210+
new RunRxResponseHandler(logging, connection, query, this::handleNewBookmark, runFailed);
211211
var cursorStage = apiTelemetryWork
212212
.pipelineTelemetryIfEnabled(connection)
213213
.thenCompose(conn -> conn.runInAutoCommitTransaction(
@@ -808,6 +808,7 @@ public AuthToken overrideAuthToken() {
808808

809809
public static class RunRxResponseHandler implements ResponseHandler {
810810
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
811+
private final Logging logging;
811812
private final BoltConnection connection;
812813
private final Query query;
813814
private final Consumer<DatabaseBookmark> bookmarkConsumer;
@@ -817,10 +818,12 @@ public static class RunRxResponseHandler implements ResponseHandler {
817818
private int ignoredCount;
818819

819820
public RunRxResponseHandler(
821+
Logging logging,
820822
BoltConnection connection,
821823
Query query,
822824
Consumer<DatabaseBookmark> bookmarkConsumer,
823825
AtomicBoolean runFailed) {
826+
this.logging = logging;
824827
this.connection = connection;
825828
this.query = query;
826829
this.bookmarkConsumer = bookmarkConsumer;
@@ -867,11 +870,11 @@ public void onComplete() {
867870
query,
868871
runSummary,
869872
error,
870-
() -> null,
871873
bookmarkConsumer,
872874
(ignored) -> {},
873875
true,
874-
() -> null));
876+
() -> null,
877+
logging));
875878
} else {
876879
var message = ignoredCount > 0
877880
? "Run exchange contains ignored messages."

driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionStage;
2727
import org.neo4j.driver.internal.FailableCursor;
28+
import org.neo4j.driver.internal.util.Futures;
2829

2930
public class ResultCursorsHolder {
3031
private final List<CompletionStage<? extends FailableCursor>> cursorStages = new ArrayList<>();
@@ -35,8 +36,11 @@ void add(CompletionStage<? extends FailableCursor> cursorStage) {
3536
cursorStages.add(cursorStage);
3637
}
3738
cursorStage.thenCompose(FailableCursor::consumed).whenComplete((ignored, throwable) -> {
39+
throwable = Futures.completionExceptionCause(throwable);
3840
synchronized (this) {
39-
cursorStages.remove(cursorStage);
41+
if (throwable == null) {
42+
cursorStages.remove(cursorStage);
43+
}
4044
}
4145
});
4246
}

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private enum State {
101101
"Can't rollback, transaction has been requested to be committed";
102102
private static final EnumSet<State> OPEN_STATES = EnumSet.of(State.ACTIVE, State.TERMINATED);
103103

104+
private final Logging logging;
104105
private final TerminationAwareBoltConnection connection;
105106
private final Consumer<DatabaseBookmark> bookmarkConsumer;
106107
private final ResultCursorsHolder resultCursors;
@@ -153,6 +154,7 @@ protected UnmanagedTransaction(
153154
NotificationConfig notificationConfig,
154155
ApiTelemetryWork apiTelemetryWork,
155156
Logging logging) {
157+
this.logging = logging;
156158
this.connection = new TerminationAwareBoltConnection(connection, this);
157159
this.databaseName = databaseName;
158160
this.accessMode = accessMode;
@@ -254,6 +256,7 @@ public CompletionStage<RxResultCursor> runRx(Query query) {
254256
ensureCanRunQueries();
255257
var parameters = query.parameters().asMap(Values::value);
256258
var responseHandler = new RunRxResponseHandler(
259+
logging,
257260
apiTelemetryWork,
258261
() -> executeWithLock(lock, () -> causeOfTermination),
259262
this::markTerminated,
@@ -673,6 +676,7 @@ public void onComplete() {
673676

674677
private static class RunRxResponseHandler implements ResponseHandler {
675678
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
679+
private final Logging logging;
676680
private final ApiTelemetryWork apiTelemetryWork;
677681
private final Supplier<Throwable> termSupplier;
678682
private final Consumer<Throwable> markTerminated;
@@ -685,13 +689,15 @@ private static class RunRxResponseHandler implements ResponseHandler {
685689
private int ignoredCount;
686690

687691
private RunRxResponseHandler(
692+
Logging logging,
688693
ApiTelemetryWork apiTelemetryWork,
689694
Supplier<Throwable> termSupplier,
690695
Consumer<Throwable> markTerminated,
691696
CompletableFuture<UnmanagedTransaction> beginFuture,
692697
UnmanagedTransaction transaction,
693698
BoltConnection connection,
694699
Query query) {
700+
this.logging = logging;
695701
this.apiTelemetryWork = apiTelemetryWork;
696702
this.termSupplier = termSupplier;
697703
this.markTerminated = markTerminated;
@@ -747,11 +753,11 @@ public void onComplete() {
747753
query,
748754
null,
749755
error,
750-
termSupplier,
751756
bookmark -> {},
752757
transaction::markTerminated,
753758
false,
754-
termSupplier));
759+
termSupplier,
760+
logging));
755761
}
756762
} else {
757763
if (runSummary != null) {
@@ -760,11 +766,11 @@ public void onComplete() {
760766
query,
761767
runSummary,
762768
null,
763-
termSupplier,
764769
bookmark -> {},
765770
transaction::markTerminated,
766771
false,
767-
termSupplier));
772+
termSupplier,
773+
logging));
768774
} else {
769775
var throwable = termSupplier.get();
770776
if (throwable == null) {

0 commit comments

Comments
 (0)