Skip to content

Commit 9c0518e

Browse files
authored
Delete results from transaction results holder when fully consumed (#1571)
1 parent 5c9a7c2 commit 9c0518e

File tree

7 files changed

+99
-10
lines changed

7 files changed

+99
-10
lines changed

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ public interface FailableCursor {
2828
* Pulling all unconsumed records into memory and returning failure if there is any pull errors.
2929
*/
3030
CompletionStage<Throwable> pullAllFailureAsync();
31+
32+
CompletionStage<Void> consumed();
3133
}

driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,39 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Arrays;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.Objects;
2625
import java.util.concurrent.CompletableFuture;
2726
import java.util.concurrent.CompletionStage;
2827
import org.neo4j.driver.internal.FailableCursor;
2928

3029
public class ResultCursorsHolder {
31-
private final List<CompletionStage<? extends FailableCursor>> cursorStages =
32-
Collections.synchronizedList(new ArrayList<>());
30+
private final List<CompletionStage<? extends FailableCursor>> cursorStages = new ArrayList<>();
3331

34-
public void add(CompletionStage<? extends FailableCursor> cursorStage) {
32+
void add(CompletionStage<? extends FailableCursor> cursorStage) {
3533
Objects.requireNonNull(cursorStage);
36-
cursorStages.add(cursorStage);
34+
synchronized (this) {
35+
cursorStages.add(cursorStage);
36+
}
37+
cursorStage.thenCompose(FailableCursor::consumed).whenComplete((ignored, throwable) -> {
38+
synchronized (this) {
39+
cursorStages.remove(cursorStage);
40+
}
41+
});
3742
}
3843

3944
CompletionStage<Throwable> retrieveNotConsumedError() {
40-
var failures = retrieveAllFailures();
41-
45+
List<CompletionStage<? extends FailableCursor>> cursorStages;
46+
synchronized (this) {
47+
cursorStages = List.copyOf(this.cursorStages);
48+
}
49+
var failures = retrieveAllFailures(cursorStages);
4250
return CompletableFuture.allOf(failures).thenApply(ignore -> findFirstFailure(failures));
4351
}
4452

4553
@SuppressWarnings("unchecked")
46-
private CompletableFuture<Throwable>[] retrieveAllFailures() {
54+
private static CompletableFuture<Throwable>[] retrieveAllFailures(
55+
List<CompletionStage<? extends FailableCursor>> cursorStages) {
4756
return cursorStages.stream()
4857
.map(ResultCursorsHolder::retrieveFailure)
4958
.map(CompletionStage::toCompletableFuture)

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class AsyncResultCursorImpl implements AsyncResultCursor {
3232
private final Throwable runError;
3333
private final RunResponseHandler runHandler;
3434
private final PullAllResponseHandler pullAllHandler;
35+
private final CompletableFuture<Void> consumedFuture = new CompletableFuture<>();
3536

3637
public AsyncResultCursorImpl(
3738
Throwable runError, RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler) {
@@ -47,7 +48,18 @@ public List<String> keys() {
4748

4849
@Override
4950
public CompletionStage<ResultSummary> consumeAsync() {
50-
return pullAllHandler.consumeAsync();
51+
var summaryFuture = new CompletableFuture<ResultSummary>();
52+
pullAllHandler.consumeAsync().whenComplete((summary, throwable) -> {
53+
throwable = Futures.completionExceptionCause(throwable);
54+
if (throwable != null) {
55+
consumedFuture.completeExceptionally(throwable);
56+
summaryFuture.completeExceptionally(throwable);
57+
} else {
58+
consumedFuture.complete(null);
59+
summaryFuture.complete(summary);
60+
}
61+
});
62+
return summaryFuture;
5163
}
5264

5365
@Override
@@ -138,4 +150,9 @@ private void internalForEachAsync(Consumer<Record> action, CompletableFuture<Voi
138150
public CompletableFuture<AsyncResultCursor> mapSuccessfulRunCompletionAsync() {
139151
return runError != null ? Futures.failedFuture(runError) : CompletableFuture.completedFuture(this);
140152
}
153+
154+
@Override
155+
public CompletableFuture<Void> consumed() {
156+
return consumedFuture;
157+
}
141158
}

driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public CompletionStage<Throwable> pullAllFailureAsync() {
9696
return delegate.pullAllFailureAsync();
9797
}
9898

99+
@Override
100+
public CompletionStage<Void> consumed() {
101+
return delegate.consumed();
102+
}
103+
99104
private <T> CompletableFuture<T> assertNotDisposed() {
100105
if (isDisposed) {
101106
return failedFuture(newResultConsumedError());

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.exceptions.TransactionNestingException;
3232
import org.neo4j.driver.internal.handlers.RunResponseHandler;
3333
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
34+
import org.neo4j.driver.internal.util.Futures;
3435
import org.neo4j.driver.summary.ResultSummary;
3536

3637
public class RxResultCursorImpl implements RxResultCursor {
@@ -46,6 +47,7 @@ public class RxResultCursorImpl implements RxResultCursor {
4647
private boolean summaryFutureExposed;
4748
private boolean resultConsumed;
4849
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;
50+
private final CompletableFuture<Void> consumedFuture = new CompletableFuture<>();
4951

5052
// for testing only
5153
public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler) {
@@ -119,10 +121,26 @@ public CompletionStage<Throwable> pullAllFailureAsync() {
119121
return discardAllFailureAsync();
120122
}
121123

124+
@Override
125+
public CompletionStage<Void> consumed() {
126+
return consumedFuture;
127+
}
128+
122129
@Override
123130
public CompletionStage<ResultSummary> summaryAsync() {
124131
summaryFutureExposed = true;
125-
return summaryStage();
132+
var summaryFuture = new CompletableFuture<ResultSummary>();
133+
summaryStage().whenComplete((summary, throwable) -> {
134+
throwable = Futures.completionExceptionCause(throwable);
135+
if (throwable != null) {
136+
consumedFuture.completeExceptionally(throwable);
137+
summaryFuture.completeExceptionally(throwable);
138+
} else {
139+
consumedFuture.complete(null);
140+
summaryFuture.complete(summary);
141+
}
142+
});
143+
return summaryFuture;
126144
}
127145

128146
@Override

driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import static org.junit.jupiter.api.Assertions.assertNull;
2323
import static org.junit.jupiter.api.Assertions.assertThrows;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
25+
import static org.mockito.BDDMockito.given;
26+
import static org.mockito.BDDMockito.then;
2527
import static org.mockito.Mockito.mock;
2628
import static org.mockito.Mockito.when;
2729
import static org.neo4j.driver.testutil.TestUtil.await;
@@ -30,7 +32,9 @@
3032
import java.util.concurrent.CompletableFuture;
3133
import java.util.concurrent.CompletionStage;
3234
import java.util.concurrent.TimeoutException;
35+
import java.util.stream.IntStream;
3336
import org.junit.jupiter.api.Test;
37+
import org.neo4j.driver.internal.FailableCursor;
3438
import org.neo4j.driver.internal.cursor.AsyncResultCursorImpl;
3539
import org.neo4j.driver.internal.util.Futures;
3640

@@ -124,6 +128,38 @@ void shouldWaitForAllFailuresToArrive() {
124128
assertEquals(error1, await(failureFuture));
125129
}
126130

131+
@Test
132+
void shouldRemoveConsumedResults() {
133+
var holder = new ResultCursorsHolder();
134+
var list = IntStream.range(0, 100)
135+
.mapToObj(i -> {
136+
var cursor = mock(FailableCursor.class);
137+
var consume = new CompletableFuture<Void>();
138+
given(cursor.consumed()).willReturn(consume);
139+
holder.add(CompletableFuture.completedFuture(cursor));
140+
if (i % 2 == 0) {
141+
consume.complete(null);
142+
given(cursor.discardAllFailureAsync())
143+
.willReturn(CompletableFuture.failedFuture(new RuntimeException()));
144+
} else {
145+
given(cursor.discardAllFailureAsync()).willReturn(CompletableFuture.completedStage(null));
146+
}
147+
return cursor;
148+
})
149+
.toList();
150+
151+
holder.retrieveNotConsumedError().toCompletableFuture().join();
152+
153+
for (var i = 0; i < list.size(); i++) {
154+
var cursor = list.get(i);
155+
then(cursor).should().consumed();
156+
if (i % 2 == 1) {
157+
then(cursor).should().discardAllFailureAsync();
158+
}
159+
then(cursor).shouldHaveNoMoreInteractions();
160+
}
161+
}
162+
127163
private static CompletionStage<AsyncResultCursorImpl> cursorWithoutError() {
128164
return cursorWithError(null);
129165
}
@@ -134,6 +170,7 @@ private static CompletionStage<AsyncResultCursorImpl> cursorWithError(Throwable
134170

135171
private static CompletionStage<AsyncResultCursorImpl> cursorWithFailureFuture(CompletableFuture<Throwable> future) {
136172
var cursor = mock(AsyncResultCursorImpl.class);
173+
when(cursor.consumed()).thenReturn(new CompletableFuture<>());
137174
when(cursor.discardAllFailureAsync()).thenReturn(future);
138175
return completedFuture(cursor);
139176
}

driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ private static Connection connectionWithBegin(Consumer<ResponseHandler> beginBeh
724724
private ResultCursorsHolder mockResultCursorWith(ClientException clientException) {
725725
var resultCursorsHolder = new ResultCursorsHolder();
726726
var cursor = mock(FailableCursor.class);
727+
given(cursor.consumed()).willReturn(new CompletableFuture<>());
727728
doReturn(completedFuture(clientException)).when(cursor).discardAllFailureAsync();
728729
resultCursorsHolder.add(completedFuture(cursor));
729730
return resultCursorsHolder;

0 commit comments

Comments
 (0)