From 34d8511c17fe611c57d4e6a15d3d8d7430c68478 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Wed, 12 Jul 2023 19:50:43 +0100 Subject: [PATCH] Add support for cancellation on reactive session run This update brings support for cancelling subscription on reactive session `run` methods, like: - `org.neo4j.driver.reactive.ReactiveSession.run(..)` - `org.neo4j.driver.reactivestreams.ReactiveSession.run(..)` If subscription is cancelled before an instance of the `ReactiveResult` is published, the driver will rollback the initiated query by sending the Bolt `RESET` message and will release the network connection back to its connection pool. This will be done in the background. Once the `ReactiveResult` is published, the cancellation with the rollback is no longer possible. In addition, this update makes the driver wait until subscriber subscribes to the publisher before dispatching the query to the server. Previously, the driver would do that immediately upon calling the `run` method. --- .../driver/internal/async/NetworkSession.java | 7 +- .../cursor/ResultCursorFactoryImpl.java | 3 +- .../internal/cursor/RxResultCursor.java | 9 ++ .../internal/cursor/RxResultCursorImpl.java | 18 ++- .../reactive/AbstractReactiveSession.java | 72 +++++++++++ .../reactive/InternalReactiveSession.java | 29 +---- .../internal/reactive/InternalRxSession.java | 2 +- .../driver/internal/reactive/RxUtils.java | 2 +- .../InternalReactiveSession.java | 29 +---- .../reactive/ReactiveSessionIT.java | 121 +++++++++++++++++- .../reactive/ReactiveStreamsSessionIT.java | 99 ++++++++++++++ .../internal/async/NetworkSessionTest.java | 3 +- .../cursor/RxResultCursorImplTest.java | 5 +- .../reactive/InternalReactiveSessionTest.java | 11 +- .../reactive/InternalRxSessionTest.java | 10 +- .../driver/testutil/DatabaseExtension.java | 4 + 16 files changed, 346 insertions(+), 78 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index c3ec067d01..cf4ee10734 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -122,10 +122,13 @@ public CompletionStage runAsync(Query query, TransactionConfig con .thenApply(cursor -> cursor); // convert the return type } - public CompletionStage runRx(Query query, TransactionConfig config) { + public CompletionStage runRx( + Query query, TransactionConfig config, CompletionStage cursorPublishStage) { var newResultCursorStage = buildResultCursorFactory(query, config).thenCompose(ResultCursorFactory::rxResult); - resultCursorStage = newResultCursorStage.exceptionally(error -> null); + resultCursorStage = newResultCursorStage + .thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage) + .exceptionally(throwable -> null); return newResultCursorStage; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java index 880be18f3a..c7f51952b5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java @@ -74,6 +74,7 @@ public CompletionStage asyncResult() { @Override public CompletionStage rxResult() { connection.writeAndFlush(runMessage, runHandler); - return runFuture.handle((ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler)); + return runFuture.handle( + (ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler, connection::release)); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java index d1821b3976..55149e0164 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java @@ -36,4 +36,13 @@ public interface RxResultCursor extends Subscription, FailableCursor { boolean isDone(); Throwable getRunError(); + + /** + * Rolls back this instance by releasing connection with RESET. + *

+ * This must never be called on a published instance. + * @return reset completion stage + * @since 5.11 + */ + CompletionStage rollback(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index f2f90443bb..cca63f96c5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import java.util.function.Supplier; import org.neo4j.driver.Record; import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.handlers.RunResponseHandler; @@ -41,23 +42,30 @@ public class RxResultCursorImpl implements RxResultCursor { private final RunResponseHandler runHandler; private final PullResponseHandler pullHandler; private final Throwable runResponseError; + private final Supplier> connectionReleaseSupplier; private boolean runErrorSurfaced; private final CompletableFuture summaryFuture = new CompletableFuture<>(); private boolean summaryFutureExposed; private boolean resultConsumed; private RecordConsumerStatus consumerStatus = NOT_INSTALLED; + // for testing only public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler) { - this(null, runHandler, pullHandler); + this(null, runHandler, pullHandler, () -> CompletableFuture.completedFuture(null)); } - public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler) { + public RxResultCursorImpl( + Throwable runError, + RunResponseHandler runHandler, + PullResponseHandler pullHandler, + Supplier> connectionReleaseSupplier) { Objects.requireNonNull(runHandler); Objects.requireNonNull(pullHandler); this.runResponseError = runError; this.runHandler = runHandler; this.pullHandler = pullHandler; + this.connectionReleaseSupplier = connectionReleaseSupplier; installSummaryConsumer(); } @@ -130,6 +138,12 @@ public Throwable getRunError() { return runResponseError; } + @Override + public CompletionStage rollback() { + summaryFuture.complete(null); + return connectionReleaseSupplier.get(); + } + public CompletionStage summaryStage() { if (!isDone() && !resultConsumed) // the summary is called before record streaming { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java index 97aff83e06..d03144c313 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java @@ -23,14 +23,19 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; +import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactivestreams.ReactiveResult; @@ -142,6 +147,73 @@ public Set lastBookmarks() { return session.lastBookmarks(); } + protected Publisher run(Query query, TransactionConfig config, Function cursorToResult) { + var cursorPublishFuture = new CompletableFuture(); + var cursorReference = new AtomicReference(); + + return createSingleItemPublisher( + () -> runAsStage(query, config, cursorPublishFuture) + .thenApply(cursor -> { + cursorReference.set(cursor); + return cursor; + }) + .thenApply(cursorToResult), + () -> new IllegalStateException( + "Unexpected condition, run call has completed successfully with result being null"), + value -> { + if (value != null) { + cursorReference.get().rollback().whenComplete((unused, throwable) -> { + if (throwable != null) { + cursorPublishFuture.completeExceptionally(throwable); + } else { + cursorPublishFuture.complete(null); + } + }); + } + }) + .doOnNext(value -> cursorPublishFuture.complete(cursorReference.get())) + .doOnError(cursorPublishFuture::completeExceptionally); + } + + private CompletionStage runAsStage( + Query query, TransactionConfig config, CompletionStage finalStage) { + CompletionStage cursorStage; + try { + cursorStage = session.runRx(query, config, finalStage); + } catch (Throwable t) { + cursorStage = Futures.failedFuture(t); + } + + return cursorStage + .handle((cursor, throwable) -> { + if (throwable != null) { + return this.releaseConnectionAndRethrow(throwable); + } else { + var runError = cursor.getRunError(); + if (runError != null) { + return this.releaseConnectionAndRethrow(runError); + } else { + return CompletableFuture.completedFuture(cursor); + } + } + }) + .thenCompose(stage -> stage); + } + + private CompletionStage releaseConnectionAndRethrow(Throwable throwable) { + return session.releaseConnectionAsync().handle((ignored, releaseThrowable) -> { + if (releaseThrowable != null) { + throw Futures.combineErrors(throwable, releaseThrowable); + } else { + if (throwable instanceof RuntimeException e) { + throw e; + } else { + throw new CompletionException(throwable); + } + } + }); + } + protected Publisher doClose() { return createEmptyPublisher(session::closeAsync); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java index e7b7e4682b..67fb3736ac 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; @@ -31,13 +30,10 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; -import org.neo4j.driver.internal.cursor.RxResultCursor; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.ReactiveTransaction; import org.neo4j.driver.reactive.ReactiveTransactionCallback; -import reactor.core.publisher.Mono; public class InternalReactiveSession extends AbstractReactiveSession implements ReactiveSession, BaseReactiveQueryRunner { @@ -89,30 +85,7 @@ public Publisher run(Query query) { @Override public Publisher run(Query query, TransactionConfig config) { - CompletionStage cursorStage; - try { - cursorStage = session.runRx(query, config); - } catch (Throwable t) { - cursorStage = Futures.failedFuture(t); - } - - return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage) - .onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(error, releaseError)) - .then(Mono.error(error))) - .flatMap(cursor -> { - Mono publisher; - var runError = cursor.getRunError(); - if (runError != null) { - publisher = Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError)) - .then(Mono.error(runError)); - } else { - publisher = Mono.just(cursor); - } - return publisher; - }) - .map(InternalReactiveResult::new)); + return publisherToFlowPublisher(run(query, config, InternalReactiveResult::new)); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index aa52e2ae6e..e135ffa187 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -96,7 +96,7 @@ public RxResult run(Query query) { public RxResult run(Query query, TransactionConfig config) { return new InternalRxResult(() -> { var resultCursorFuture = new CompletableFuture(); - session.runRx(query, config).whenComplete((cursor, completionError) -> { + session.runRx(query, config, resultCursorFuture).whenComplete((cursor, completionError) -> { if (cursor != null) { resultCursorFuture.complete(cursor); } else { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java index bc4bf45d7e..445fae3ed7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java @@ -56,7 +56,7 @@ public static Publisher createEmptyPublisher(Supplier the type of the item to publish. * @return A publisher that succeeds exactly one item or fails with an error. */ - public static Publisher createSingleItemPublisher( + public static Mono createSingleItemPublisher( Supplier> supplier, Supplier nullResultThrowableSupplier, Consumer cancellationHandler) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java index 8f46bb12b5..4e8c60e51d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java @@ -20,22 +20,18 @@ import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CompletionStage; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; -import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.reactive.AbstractReactiveSession; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactivestreams.ReactiveResult; import org.neo4j.driver.reactivestreams.ReactiveSession; import org.neo4j.driver.reactivestreams.ReactiveTransaction; import org.neo4j.driver.reactivestreams.ReactiveTransactionCallback; import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; public class InternalReactiveSession extends AbstractReactiveSession implements ReactiveSession, BaseReactiveQueryRunner { @@ -83,30 +79,7 @@ public Publisher run(Query query) { @Override public Publisher run(Query query, TransactionConfig config) { - CompletionStage cursorStage; - try { - cursorStage = session.runRx(query, config); - } catch (Throwable t) { - cursorStage = Futures.failedFuture(t); - } - - return Mono.fromCompletionStage(cursorStage) - .onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(error, releaseError)) - .then(Mono.error(error))) - .flatMap(cursor -> { - Mono publisher; - var runError = cursor.getRunError(); - if (runError != null) { - publisher = Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError)) - .then(Mono.error(runError)); - } else { - publisher = Mono.just(cursor); - } - return publisher; - }) - .map(InternalReactiveResult::new); + return run(query, config, InternalReactiveResult::new); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java index 1027e6ff6d..46159f6b43 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java @@ -20,13 +20,27 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import java.util.function.Function; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.neo4j.driver.Config; +import org.neo4j.driver.ConnectionPoolMetrics; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactive.ReactiveResult; @@ -34,7 +48,8 @@ import org.neo4j.driver.testutil.DatabaseExtension; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; -import reactor.adapter.JdkFlowAdapter; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; @EnabledOnNeo4jWith(BOLT_V4) @@ -55,13 +70,111 @@ void shouldErrorWhenReactiveResultIsReturned(Function CompletableFuture.supplyAsync( + () -> { + var subscriptionFuture = new CompletableFuture(); + driver.session(ReactiveSession.class) + .run("UNWIND range (0,10000) AS x RETURN x") + .subscribe(new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscriptionFuture.complete(subscription); + } + + @Override + public void onNext(ReactiveResult item) { + // ignored + } + + @Override + public void onError(Throwable throwable) { + // ignored + } + + @Override + public void onComplete() { + // ignored + } + }); + return subscriptionFuture.thenApplyAsync( + subscription -> { + if (request) { + subscription.request(1); + } + subscription.cancel(); + return subscription; + }, + executorService); + }, + executorService)) + .map(future -> future.thenCompose(itself -> itself)) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(subscriptionFutures).join(); + + // Subscription cancellation does not guarantee neither onComplete nor onError signal. + var timeout = Instant.now().plus(5, ChronoUnit.MINUTES); + var totalInUseConnections = -1; + while (Instant.now().isBefore(timeout)) { + totalInUseConnections = driver.metrics().connectionPoolMetrics().stream() + .map(ConnectionPoolMetrics::inUse) + .mapToInt(Integer::intValue) + .sum(); + if (totalInUseConnections == 0) { + return; + } + Thread.sleep(100); + } + fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections)); + } + } + + @Test + void shouldRollbackResultOnSubscriptionCancellation() { + var config = Config.builder().withMaxConnectionPoolSize(1).build(); + try (var driver = neo4j.customDriver(config)) { + var session = driver.session(ReactiveSession.class); + var nodeId = UUID.randomUUID().toString(); + var cancellationFuture = new CompletableFuture(); + + flowPublisherToFlux(session.run("CREATE ({id: $id})", Map.of("id", nodeId))) + .subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.cancel(); + cancellationFuture.complete(null); + } + }); + + cancellationFuture.join(); + + var nodesNum = flowPublisherToFlux(session.run("MATCH (n {id: $id}) RETURN n", Map.of("id", nodeId))) + .flatMap(result -> flowPublisherToFlux(result.records())) + .count() + .block(); + assertEquals(0, nodesNum); + } } static List>> managedTransactionsReturningReactiveResultPublisher() { return List.of( - session -> JdkFlowAdapter.flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))), - session -> JdkFlowAdapter.flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1")))); + session -> flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))), + session -> flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1")))); } } diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java index f03117fc97..77ba5dcf51 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java @@ -20,13 +20,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.function.Function; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.neo4j.driver.Config; +import org.neo4j.driver.ConnectionPoolMetrics; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactivestreams.ReactiveResult; @@ -34,7 +46,10 @@ import org.neo4j.driver.testutil.DatabaseExtension; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @EnabledOnNeo4jWith(BOLT_V4) @ParallelizableIT @@ -57,6 +72,90 @@ void shouldErrorWhenReactiveResultIsReturned(Function CompletableFuture.supplyAsync( + () -> { + var subscriptionFuture = new CompletableFuture(); + driver.session(ReactiveSession.class) + .run("UNWIND range (0,10000) AS x RETURN x") + .subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + // use subscription from another thread to avoid immediate cancellation + // within the subscribe method + subscriptionFuture.complete(subscription); + } + }); + return subscriptionFuture.thenApplyAsync( + subscription -> { + if (request) { + subscription.request(1); + } + subscription.cancel(); + return subscription; + }, + executorService); + }, + executorService)) + .map(future -> future.thenCompose(itself -> itself)) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(subscriptionFutures).join(); + + // Subscription cancellation does not guarantee neither onComplete nor onError signal. + var timeout = Instant.now().plus(5, ChronoUnit.MINUTES); + var totalInUseConnections = -1; + while (Instant.now().isBefore(timeout)) { + totalInUseConnections = driver.metrics().connectionPoolMetrics().stream() + .map(ConnectionPoolMetrics::inUse) + .mapToInt(Integer::intValue) + .sum(); + if (totalInUseConnections == 0) { + return; + } + Thread.sleep(100); + } + fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections)); + } + } + + @Test + void shouldRollbackResultOnSubscriptionCancellation() { + var config = Config.builder().withMaxConnectionPoolSize(1).build(); + try (var driver = neo4j.customDriver(config)) { + var session = driver.session(ReactiveSession.class); + var nodeId = UUID.randomUUID().toString(); + var cancellationFuture = new CompletableFuture(); + + session.run("CREATE ({id: $id})", Map.of("id", nodeId)).subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.cancel(); + cancellationFuture.complete(null); + } + }); + + cancellationFuture.join(); + + var nodesNum = Mono.fromDirect(session.run("MATCH (n {id: $id}) RETURN n", Map.of("id", nodeId))) + .flatMapMany(ReactiveResult::records) + .count() + .block(); + assertEquals(0, nodesNum); + } + } + static List>> managedTransactionsReturningReactiveResultPublisher() { return List.of( diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java index 74f9d40224..6d51f69c21 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java @@ -53,6 +53,7 @@ import java.util.Collections; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -99,7 +100,7 @@ void shouldFlushOnRunAsync() { @Test void shouldFlushOnRunRx() { setupSuccessfulRunRx(connection); - await(session.runRx(new Query("RETURN 1"), TransactionConfig.empty())); + await(session.runRx(new Query("RETURN 1"), TransactionConfig.empty(), CompletableFuture.completedStage(null))); verifyRunRx(connection, "RETURN 1"); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java index f462ce6050..ff306ea513 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java @@ -60,7 +60,7 @@ void shouldInstallSummaryConsumerWithoutReportingError() { var pullHandler = mock(PullResponseHandler.class); // When - new RxResultCursorImpl(error, runHandler, pullHandler); + new RxResultCursorImpl(error, runHandler, pullHandler, () -> CompletableFuture.completedStage(null)); // Then verify(pullHandler).installSummaryConsumer(any(BiConsumer.class)); @@ -160,7 +160,8 @@ void shouldInstallRecordConsumerAndReportError() { // When var runHandler = newRunResponseHandler(error); PullResponseHandler pullHandler = new ListBasedPullHandler(); - RxResultCursor cursor = new RxResultCursorImpl(error, runHandler, pullHandler); + RxResultCursor cursor = + new RxResultCursorImpl(error, runHandler, pullHandler, () -> CompletableFuture.completedStage(null)); cursor.installRecordConsumer(recordConsumer); // Then diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java index 33a67a5943..0562c762d4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java @@ -98,14 +98,16 @@ void shouldDelegateRun(Function> runR RxResultCursor cursor = mock(RxResultCursorImpl.class); // Run succeeded with a cursor - when(session.runRx(any(Query.class), any(TransactionConfig.class))).thenReturn(completedFuture(cursor)); + when(session.runRx(any(Query.class), any(TransactionConfig.class), any())) + .thenReturn(completedFuture(cursor)); var rxSession = new InternalReactiveSession(session); // When var result = flowPublisherToFlux(runReturnOne.apply(rxSession)); + result.subscribe(); // Then - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); StepVerifier.create(result).expectNextCount(1).verifyComplete(); } @@ -117,7 +119,8 @@ void shouldReleaseConnectionIfFailedToRun(Function error == t).verify(); - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); verify(session).releaseConnectionAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index f1b204d4f4..c5afe1e0ca 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -98,7 +98,8 @@ void shouldDelegateRun(Function runReturnOne) { RxResultCursor cursor = mock(RxResultCursorImpl.class); // Run succeeded with a cursor - when(session.runRx(any(Query.class), any(TransactionConfig.class))).thenReturn(completedFuture(cursor)); + when(session.runRx(any(Query.class), any(TransactionConfig.class), any())) + .thenReturn(completedFuture(cursor)); var rxSession = new InternalRxSession(session); // When @@ -107,7 +108,7 @@ void shouldDelegateRun(Function runReturnOne) { var cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); // Then - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); assertThat(Futures.getNow(cursorFuture), equalTo(cursor)); } @@ -119,7 +120,8 @@ void shouldReleaseConnectionIfFailedToRun(Function runRetur var session = mock(NetworkSession.class); // Run failed with error - when(session.runRx(any(Query.class), any(TransactionConfig.class))).thenReturn(Futures.failedFuture(error)); + when(session.runRx(any(Query.class), any(TransactionConfig.class), any())) + .thenReturn(Futures.failedFuture(error)); when(session.releaseConnectionAsync()).thenReturn(Futures.completedWithNull()); var rxSession = new InternalRxSession(session); @@ -130,7 +132,7 @@ void shouldReleaseConnectionIfFailedToRun(Function runRetur var cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); // Then - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); RuntimeException t = assertThrows(CompletionException.class, () -> Futures.getNow(cursorFuture)); assertThat(t.getCause(), equalTo(error)); verify(session).releaseConnectionAsync(); diff --git a/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java b/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java index be89388771..9a56e9ea1d 100644 --- a/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java +++ b/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java @@ -148,6 +148,10 @@ public Driver driver() { return driver; } + public Driver customDriver(Config config) { + return GraphDatabase.driver(boltUri, authToken, config); + } + public void deleteAndStartNeo4j(Map config) { Map updatedConfig = new HashMap<>(defaultConfig); updatedConfig.putAll(config);