From b5098b0012761bbbedf2d23f5a491eb007058497 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Tue, 24 Sep 2024 22:01:43 +0100 Subject: [PATCH] Delete results from transaction results holder when fully consumed --- .../neo4j/driver/internal/FailableCursor.java | 2 + .../internal/async/ResultCursorsHolder.java | 25 +++++++++---- .../cursor/AsyncResultCursorImpl.java | 19 +++++++++- .../cursor/DisposableAsyncResultCursor.java | 5 +++ .../internal/cursor/RxResultCursorImpl.java | 20 +++++++++- .../async/ResultCursorsHolderTest.java | 37 +++++++++++++++++++ .../async/UnmanagedTransactionTest.java | 1 + 7 files changed, 99 insertions(+), 10 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java b/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java index 9d3a4dfa56..0c59a0644b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java @@ -28,4 +28,6 @@ public interface FailableCursor { * Pulling all unconsumed records into memory and returning failure if there is any pull errors. */ CompletionStage pullAllFailureAsync(); + + CompletionStage consumed(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java index 27f5a718c2..9322cd08e3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -28,22 +27,32 @@ import org.neo4j.driver.internal.FailableCursor; public class ResultCursorsHolder { - private final List> cursorStages = - Collections.synchronizedList(new ArrayList<>()); + private final List> cursorStages = new ArrayList<>(); - public void add(CompletionStage cursorStage) { + void add(CompletionStage cursorStage) { Objects.requireNonNull(cursorStage); - cursorStages.add(cursorStage); + synchronized (this) { + cursorStages.add(cursorStage); + } + cursorStage.thenCompose(FailableCursor::consumed).whenComplete((ignored, throwable) -> { + synchronized (this) { + cursorStages.remove(cursorStage); + } + }); } CompletionStage retrieveNotConsumedError() { - var failures = retrieveAllFailures(); - + List> cursorStages; + synchronized (this) { + cursorStages = List.copyOf(this.cursorStages); + } + var failures = retrieveAllFailures(cursorStages); return CompletableFuture.allOf(failures).thenApply(ignore -> findFirstFailure(failures)); } @SuppressWarnings("unchecked") - private CompletableFuture[] retrieveAllFailures() { + private static CompletableFuture[] retrieveAllFailures( + List> cursorStages) { return cursorStages.stream() .map(ResultCursorsHolder::retrieveFailure) .map(CompletionStage::toCompletableFuture) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java index cdf0d59e81..5376ca9a27 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java @@ -32,6 +32,7 @@ public class AsyncResultCursorImpl implements AsyncResultCursor { private final Throwable runError; private final RunResponseHandler runHandler; private final PullAllResponseHandler pullAllHandler; + private final CompletableFuture consumedFuture = new CompletableFuture<>(); public AsyncResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler) { @@ -47,7 +48,18 @@ public List keys() { @Override public CompletionStage consumeAsync() { - return pullAllHandler.consumeAsync(); + var summaryFuture = new CompletableFuture(); + pullAllHandler.consumeAsync().whenComplete((summary, throwable) -> { + throwable = Futures.completionExceptionCause(throwable); + if (throwable != null) { + consumedFuture.completeExceptionally(throwable); + summaryFuture.completeExceptionally(throwable); + } else { + consumedFuture.complete(null); + summaryFuture.complete(summary); + } + }); + return summaryFuture; } @Override @@ -138,4 +150,9 @@ private void internalForEachAsync(Consumer action, CompletableFuture mapSuccessfulRunCompletionAsync() { return runError != null ? Futures.failedFuture(runError) : CompletableFuture.completedFuture(this); } + + @Override + public CompletableFuture consumed() { + return consumedFuture; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java index 610582e452..4bd6cc227d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java @@ -96,6 +96,11 @@ public CompletionStage pullAllFailureAsync() { return delegate.pullAllFailureAsync(); } + @Override + public CompletionStage consumed() { + return delegate.consumed(); + } + private CompletableFuture assertNotDisposed() { if (isDisposed) { return failedFuture(newResultConsumedError()); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index 63358fa0ce..f0d663b4dc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -31,6 +31,7 @@ import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.handlers.RunResponseHandler; import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.summary.ResultSummary; public class RxResultCursorImpl implements RxResultCursor { @@ -46,6 +47,7 @@ public class RxResultCursorImpl implements RxResultCursor { private boolean summaryFutureExposed; private boolean resultConsumed; private RecordConsumerStatus consumerStatus = NOT_INSTALLED; + private final CompletableFuture consumedFuture = new CompletableFuture<>(); // for testing only public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler) { @@ -119,10 +121,26 @@ public CompletionStage pullAllFailureAsync() { return discardAllFailureAsync(); } + @Override + public CompletionStage consumed() { + return consumedFuture; + } + @Override public CompletionStage summaryAsync() { summaryFutureExposed = true; - return summaryStage(); + var summaryFuture = new CompletableFuture(); + summaryStage().whenComplete((summary, throwable) -> { + throwable = Futures.completionExceptionCause(throwable); + if (throwable != null) { + consumedFuture.completeExceptionally(throwable); + summaryFuture.completeExceptionally(throwable); + } else { + consumedFuture.complete(null); + summaryFuture.complete(summary); + } + }); + return summaryFuture; } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java index 571c3ecf14..d8635bbc0c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java @@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.driver.testutil.TestUtil.await; @@ -30,7 +32,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; import org.junit.jupiter.api.Test; +import org.neo4j.driver.internal.FailableCursor; import org.neo4j.driver.internal.cursor.AsyncResultCursorImpl; import org.neo4j.driver.internal.util.Futures; @@ -124,6 +128,38 @@ void shouldWaitForAllFailuresToArrive() { assertEquals(error1, await(failureFuture)); } + @Test + void shouldRemoveConsumedResults() { + var holder = new ResultCursorsHolder(); + var list = IntStream.range(0, 100) + .mapToObj(i -> { + var cursor = mock(FailableCursor.class); + var consume = new CompletableFuture(); + given(cursor.consumed()).willReturn(consume); + holder.add(CompletableFuture.completedFuture(cursor)); + if (i % 2 == 0) { + consume.complete(null); + given(cursor.discardAllFailureAsync()) + .willReturn(CompletableFuture.failedFuture(new RuntimeException())); + } else { + given(cursor.discardAllFailureAsync()).willReturn(CompletableFuture.completedStage(null)); + } + return cursor; + }) + .toList(); + + holder.retrieveNotConsumedError().toCompletableFuture().join(); + + for (var i = 0; i < list.size(); i++) { + var cursor = list.get(i); + then(cursor).should().consumed(); + if (i % 2 == 1) { + then(cursor).should().discardAllFailureAsync(); + } + then(cursor).shouldHaveNoMoreInteractions(); + } + } + private static CompletionStage cursorWithoutError() { return cursorWithError(null); } @@ -134,6 +170,7 @@ private static CompletionStage cursorWithError(Throwable private static CompletionStage cursorWithFailureFuture(CompletableFuture future) { var cursor = mock(AsyncResultCursorImpl.class); + when(cursor.consumed()).thenReturn(new CompletableFuture<>()); when(cursor.discardAllFailureAsync()).thenReturn(future); return completedFuture(cursor); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index 99dfeb1633..cee78f8b33 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -724,6 +724,7 @@ private static Connection connectionWithBegin(Consumer beginBeh private ResultCursorsHolder mockResultCursorWith(ClientException clientException) { var resultCursorsHolder = new ResultCursorsHolder(); var cursor = mock(FailableCursor.class); + given(cursor.consumed()).willReturn(new CompletableFuture<>()); doReturn(completedFuture(clientException)).when(cursor).discardAllFailureAsync(); resultCursorsHolder.add(completedFuture(cursor)); return resultCursorsHolder;