diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java index 94e7dc119a..e2c6dfa996 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; import org.neo4j.bolt.connection.AccessMode; import org.neo4j.bolt.connection.AuthInfo; import org.neo4j.bolt.connection.AuthTokens; @@ -48,8 +49,8 @@ final class AdaptingDriverBoltConnection implements DriverBoltConnection { } @Override - public CompletionStage onLoop() { - return connection.onLoop().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this); + public CompletionStage onLoop(Supplier supplier) { + return connection.onLoop(supplier); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java index ea4b383eb9..cde9fea186 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; import org.neo4j.bolt.connection.AccessMode; import org.neo4j.bolt.connection.AuthInfo; import org.neo4j.bolt.connection.BoltConnectionState; @@ -32,7 +33,7 @@ import org.neo4j.driver.Value; public interface DriverBoltConnection { - CompletionStage onLoop(); + CompletionStage onLoop(Supplier supplier); CompletionStage route( DatabaseName databaseName, String impersonatedUser, Set bookmarks); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java index faef188860..fbf6d7de66 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; import org.neo4j.bolt.connection.AccessMode; import org.neo4j.bolt.connection.AuthInfo; import org.neo4j.bolt.connection.BoltConnectionState; @@ -42,8 +43,8 @@ protected DelegatingBoltConnection(DriverBoltConnection delegate) { } @Override - public CompletionStage onLoop() { - return delegate.onLoop().thenApply(ignored -> this); + public CompletionStage onLoop(Supplier supplier) { + return delegate.onLoop(supplier); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index d53224cd6e..c3b1a48e5d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -29,8 +29,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -707,6 +710,7 @@ public AuthToken overrideAuthToken() { } public static class RunRxResponseHandler implements DriverResponseHandler { + private static final Lock NOOP_LOCK = new NoopLock(); final CompletableFuture cursorFuture = new CompletableFuture<>(); private final Logging logging; private final DriverBoltConnection connection; @@ -763,8 +767,8 @@ public void onComplete() { if (error != null) { runFailed.set(true); } - cursorFuture.complete( - new RxResultCursorImpl(connection, query, runSummary, error, bookmarkConsumer, true, logging)); + cursorFuture.complete(new RxResultCursorImpl( + connection, NOOP_LOCK, query, runSummary, error, bookmarkConsumer, true, logging)); } else { var message = ignoredCount > 0 ? "Run exchange contains ignored messages." @@ -793,4 +797,30 @@ public boolean handleSecurityException(AuthToken authToken, SecurityException ex return false; } } + + private static class NoopLock implements Lock { + @Override + public void lock() {} + + @Override + public void lockInterruptibly() {} + + @Override + public boolean tryLock() { + return true; + } + + @Override + public boolean tryLock(long time, TimeUnit unit) { + return true; + } + + @Override + public void unlock() {} + + @Override + public Condition newCondition() { + return null; + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java index 02a10d11e9..890b7d418d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import java.util.function.Function; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection; @@ -48,81 +49,82 @@ public TerminationAwareBoltConnection( public CompletionStage clearAndReset() { var future = new CompletableFuture(); var thisVal = this; - - delegate.onLoop() - .thenCompose(connection -> executor.execute(ignored -> connection - .clear() - .thenCompose(DriverBoltConnection::reset) - .thenCompose(conn -> conn.flush(new DriverResponseHandler() { - Throwable throwable = null; - - @Override - public void onError(Throwable throwable) { - log.error("Unexpected error occurred while resetting connection", throwable); - throwableConsumer.accept(throwable); - this.throwable = throwable; - } - - @Override - public void onComplete() { - if (throwable != null) { - future.completeExceptionally(throwable); - } else { - future.complete(thisVal); - } - } - })))) + delegate.onLoop(() -> executor.execute(ignored -> clearAndResetBolt(future))) + .thenCompose(Function.identity()) .whenComplete((ignored, throwable) -> { if (throwable != null) { throwableConsumer.accept(throwable); future.completeExceptionally(throwable); } }); - return future; } + private CompletionStage clearAndResetBolt(CompletableFuture future) { + var thisVal = this; + return delegate.clear() + .thenCompose(DriverBoltConnection::reset) + .thenCompose(conn -> conn.flush(new DriverResponseHandler() { + Throwable throwable = null; + + @Override + public void onError(Throwable throwable) { + log.error("Unexpected error occurred while resetting connection", throwable); + throwableConsumer.accept(throwable); + this.throwable = throwable; + } + + @Override + public void onComplete() { + if (throwable != null) { + future.completeExceptionally(throwable); + } else { + future.complete(thisVal); + } + } + })); + } + @Override public CompletionStage flush(DriverResponseHandler handler) { - return delegate.onLoop() - .thenCompose(connection -> executor.execute(causeOfTermination -> { - if (causeOfTermination == null) { - log.trace("This connection is active, will flush"); - var terminationAwareResponseHandler = - new TerminationAwareResponseHandler(logging, handler, executor, throwableConsumer); - return delegate.flush(terminationAwareResponseHandler).handle((ignored, flushThrowable) -> { - flushThrowable = Futures.completionExceptionCause(flushThrowable); - if (flushThrowable != null) { - if (log.isTraceEnabled()) { - log.error("The flush has failed", flushThrowable); - } - var flushThrowableRef = flushThrowable; - flushThrowable = executor.execute(existingThrowable -> { - if (existingThrowable != null) { - log.trace( - "The flush has failed, but there is an existing %s", existingThrowable); - return existingThrowable; - } else { - throwableConsumer.accept(flushThrowableRef); - return flushThrowableRef; - } - }); - // rethrow - if (flushThrowable instanceof RuntimeException runtimeException) { - throw runtimeException; - } else { - throw new CompletionException(flushThrowable); - } - } else { - return ignored; - } - }); + return delegate.onLoop(() -> executor.execute(causeOfTermination -> flushBolt(causeOfTermination, handler))) + .thenCompose(Function.identity()); + } + + private CompletionStage flushBolt(Throwable causeOfTermination, DriverResponseHandler handler) { + if (causeOfTermination == null) { + log.trace("This connection is active, will flush"); + var terminationAwareResponseHandler = + new TerminationAwareResponseHandler(logging, handler, executor, throwableConsumer); + return delegate.flush(terminationAwareResponseHandler).handle((ignored, flushThrowable) -> { + flushThrowable = Futures.completionExceptionCause(flushThrowable); + if (flushThrowable != null) { + if (log.isTraceEnabled()) { + log.error("The flush has failed", flushThrowable); + } + var flushThrowableRef = flushThrowable; + flushThrowable = executor.execute(existingThrowable -> { + if (existingThrowable != null) { + log.trace("The flush has failed, but there is an existing %s", existingThrowable); + return existingThrowable; + } else { + throwableConsumer.accept(flushThrowableRef); + return flushThrowableRef; + } + }); + // rethrow + if (flushThrowable instanceof RuntimeException runtimeException) { + throw runtimeException; } else { - // there is an existing error - return connection - .clear() - .thenCompose(ignored -> CompletableFuture.failedStage(causeOfTermination)); + throw new CompletionException(flushThrowable); } - })); + } else { + return ignored; + } + }); + } else { + // there is an existing error + return delegate.clear().thenCompose(ignored -> CompletableFuture.failedStage(causeOfTermination)); + } } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 71c5130228..d14a5850f6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -106,6 +106,7 @@ private enum State { private final ResultCursorsHolder resultCursors; private final long fetchSize; private final Lock lock = new ReentrantLock(); + private final Lock connectionLock = new ReentrantLock(); private State state = State.ACTIVE; private CompletableFuture commitFuture; private CompletableFuture rollbackFuture; @@ -257,9 +258,17 @@ public CompletionStage runAsync(Query query) { public CompletionStage runRx(Query query) { ensureCanRunQueries(); var parameters = query.parameters().asMap(Values::value); - var responseHandler = new RunRxResponseHandler(logging, apiTelemetryWork, beginFuture, connection, query); - var flushStage = - connection.run(query.text(), parameters).thenCompose(ignored2 -> connection.flush(responseHandler)); + var responseHandler = + new RunRxResponseHandler(logging, apiTelemetryWork, beginFuture, connection, connectionLock, query); + var flushStage = connection + .onLoop(() -> { + connectionLock.lock(); + return connection + .run(query.text(), parameters) + .thenCompose(conn -> conn.flush(responseHandler)) + .whenComplete((ignored, throwable) -> connectionLock.unlock()); + }) + .thenCompose(Function.identity()); return beginFuture.thenCompose(ignored -> { var cursorStage = flushStage.thenCompose(flushResult -> responseHandler.cursorFuture); resultCursors.add(cursorStage); @@ -670,6 +679,7 @@ private static class RunRxResponseHandler implements DriverResponseHandler { private final ApiTelemetryWork apiTelemetryWork; private final CompletableFuture beginFuture; private final DriverBoltConnection connection; + private final Lock connectionLock; private final Query query; private Throwable error; private RunSummary runSummary; @@ -680,11 +690,13 @@ private RunRxResponseHandler( ApiTelemetryWork apiTelemetryWork, CompletableFuture beginFuture, DriverBoltConnection connection, + Lock connectionLock, Query query) { this.logging = logging; this.apiTelemetryWork = apiTelemetryWork; this.beginFuture = beginFuture; this.connection = connection; + this.connectionLock = connectionLock; this.query = query; } @@ -720,13 +732,13 @@ public void onIgnored() { public void onComplete() { if (error != null) { if (!beginFuture.completeExceptionally(error)) { - cursorFuture.complete( - new RxResultCursorImpl(connection, query, null, error, bookmark -> {}, false, logging)); + cursorFuture.complete(new RxResultCursorImpl( + connection, connectionLock, query, null, error, bookmark -> {}, false, logging)); } } else { if (runSummary != null) { cursorFuture.complete(new RxResultCursorImpl( - connection, query, runSummary, null, bookmark -> {}, false, logging)); + connection, connectionLock, query, runSummary, null, bookmark -> {}, false, logging)); } else { var message = ignoredCount > 0 ? "Run exchange contains ignored messages" : "Unexpected state during run"; diff --git a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java index 3a6548ae86..d38e108d84 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; import org.neo4j.bolt.connection.AccessMode; import org.neo4j.bolt.connection.AuthInfo; import org.neo4j.bolt.connection.AuthToken; @@ -46,8 +47,8 @@ public ListeningBoltConnection(BoltConnection delegate, BoltConnectionListener b } @Override - public CompletionStage onLoop() { - return delegate.onLoop().thenApply(ignored -> this); + public CompletionStage onLoop(Supplier supplier) { + return delegate.onLoop(supplier); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index d0e71f5138..1ceb35af7d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -26,8 +26,10 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import org.neo4j.bolt.connection.BoltProtocolVersion; import org.neo4j.bolt.connection.summary.RunSummary; import org.neo4j.driver.Bookmark; @@ -86,6 +88,7 @@ public Optional databaseName() { }; private final Logger log; private final DriverBoltConnection boltConnection; + private final Lock boltConnectionLock; private final Query query; private final RunSummary runSummary; private final Throwable runError; @@ -115,13 +118,15 @@ private enum State { public RxResultCursorImpl( DriverBoltConnection boltConnection, + Lock boltConnectionLock, Query query, RunSummary runSummary, Throwable runError, Consumer bookmarkConsumer, boolean closeOnSummary, Logging logging) { - this.boltConnection = boltConnection; + this.boltConnection = Objects.requireNonNull(boltConnection); + this.boltConnectionLock = Objects.requireNonNull(boltConnectionLock); this.legacyNotifications = new BoltProtocolVersion(5, 5).compareTo(boltConnection.protocolVersion()) > 0; this.query = query; this.runSummary = runError == null ? runSummary : EMPTY_RUN_SUMMARY; @@ -187,16 +192,20 @@ public void request(long n) { case READY -> { var request = appendDemand(n); state = State.STREAMING; - runnable = () -> boltConnection - .pull(runSummary.queryId(), request) - .thenCompose(conn -> conn.flush(this)) - .whenComplete((ignored, throwable) -> { - throwable = Futures.completionExceptionCause(throwable); - if (throwable != null) { - handleError(throwable); - onComplete(); - } - }); + runnable = () -> boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection + .pull(runSummary.queryId(), request) + .thenCompose(conn -> conn.flush(this)) + .whenComplete((ignored, throwable) -> { + boltConnectionLock.unlock(); + throwable = Futures.completionExceptionCause(throwable); + if (throwable != null) { + handleError(throwable); + onComplete(); + } + }); + }); } case STREAMING -> appendDemand(n); case FAILED, DISCARDING, SUCCEEDED -> {} @@ -263,33 +272,42 @@ public CompletionStage rollback() { } } var resetFuture = new CompletableFuture(); - boltConnection - .reset() - .thenCompose(conn -> conn.flush(new DriverResponseHandler() { - Throwable throwable = null; - - @Override - public void onError(Throwable throwable) { - this.throwable = Futures.completionExceptionCause(throwable); - } + boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection + .reset() + .thenCompose(conn -> conn.flush(new DriverResponseHandler() { + Throwable throwable = null; + + @Override + public void onError(Throwable throwable) { + this.throwable = Futures.completionExceptionCause(throwable); + } - @Override - public void onComplete() { + @Override + public void onComplete() { + if (throwable != null) { + resetFuture.completeExceptionally(throwable); + } else { + resetFuture.complete(null); + } + } + })) + .whenComplete((ignored, throwable) -> { + boltConnectionLock.unlock(); + throwable = Futures.completionExceptionCause(throwable); if (throwable != null) { resetFuture.completeExceptionally(throwable); - } else { - resetFuture.complete(null); } - } - })) - .whenComplete((ignored, throwable) -> { - throwable = Futures.completionExceptionCause(throwable); - if (throwable != null) { - resetFuture.completeExceptionally(throwable); - } - }); + }); + }); + return resetFuture - .thenCompose(ignored -> boltConnection.close()) + .thenCompose(ignored -> boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection.close().whenComplete((result, error) -> boltConnectionLock.unlock()); + })) + .thenCompose(Function.identity()) .whenComplete((ignored, throwable) -> completeSummaryFuture(null, null)) .exceptionally(throwable -> null); } @@ -402,16 +420,20 @@ private synchronized void decrementDemand() { private synchronized Runnable setupDiscardRunnable() { state = State.DISCARDING; - return () -> boltConnection - .discard(runSummary.queryId(), -1) - .thenCompose(conn -> conn.flush(this)) - .whenComplete((ignored, throwable) -> { - throwable = Futures.completionExceptionCause(throwable); - if (throwable != null) { - handleError(throwable); - onComplete(); - } - }); + return () -> boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection + .discard(runSummary.queryId(), -1) + .thenCompose(conn -> conn.flush(this)) + .whenComplete((ignored, throwable) -> { + boltConnectionLock.unlock(); + throwable = Futures.completionExceptionCause(throwable); + if (throwable != null) { + handleError(throwable); + onComplete(); + } + }); + }); } private synchronized Runnable setupCompletionRunnableWithPullSummary() { @@ -422,30 +444,38 @@ private synchronized Runnable setupCompletionRunnableWithPullSummary() { if (discardPending) { discardPending = false; state = State.DISCARDING; - runnable = () -> boltConnection - .discard(runSummary.queryId(), -1) - .thenCompose(conn -> conn.flush(this)) - .whenComplete((ignored, flushThrowable) -> { - var error = Futures.completionExceptionCause(flushThrowable); - if (error != null) { - handleError(error); - onComplete(); - } - }); - } else { - var demand = getDemand(); - if (demand != 0) { - state = State.STREAMING; - runnable = () -> boltConnection - .pull(runSummary.queryId(), demand > 0 ? demand : -1) + runnable = () -> boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection + .discard(runSummary.queryId(), -1) .thenCompose(conn -> conn.flush(this)) .whenComplete((ignored, flushThrowable) -> { + boltConnectionLock.unlock(); var error = Futures.completionExceptionCause(flushThrowable); if (error != null) { handleError(error); onComplete(); } }); + }); + } else { + var demand = getDemand(); + if (demand != 0) { + state = State.STREAMING; + runnable = () -> boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection + .pull(runSummary.queryId(), demand > 0 ? demand : -1) + .thenCompose(conn -> conn.flush(this)) + .whenComplete((ignored, flushThrowable) -> { + boltConnectionLock.unlock(); + var error = Futures.completionExceptionCause(flushThrowable); + if (error != null) { + handleError(error); + onComplete(); + } + }); + }); } else { state = State.READY; } @@ -514,7 +544,15 @@ private synchronized Runnable setupCompletionRunnableWithError(Throwable throwab } private void closeBoltConnection(Runnable runnable) { - var closeStage = closeOnSummary ? boltConnection.close() : CompletableFuture.completedStage(null); + var closeStage = CompletableFuture.completedStage(null); + if (closeOnSummary) { + closeStage = closeStage + .thenCompose(ignored -> boltConnection.onLoop(() -> { + boltConnectionLock.lock(); + return boltConnection.close().whenComplete((result, error) -> boltConnectionLock.unlock()); + })) + .thenCompose(Function.identity()); + } closeStage.whenComplete((ignored, closeThrowable) -> { if (log.isTraceEnabled() && closeThrowable != null) { log.error( diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveTransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveTransactionIT.java index 6243b54a63..b2fc38927d 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveTransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveTransactionIT.java @@ -22,12 +22,16 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.neo4j.driver.Config; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TransactionTerminatedException; import org.neo4j.driver.internal.reactivestreams.InternalReactiveTransaction; +import org.neo4j.driver.reactivestreams.ReactiveResult; import org.neo4j.driver.reactivestreams.ReactiveSession; import org.neo4j.driver.testutil.DatabaseExtension; import org.neo4j.driver.testutil.ParallelizableIT; @@ -214,4 +218,33 @@ void shouldTerminateTransactionAndHandleFailureResponseOrPreventFurtherPulls() { .blockLast()); Mono.fromDirect(tx.close()).block(); } + + @Test + @SuppressWarnings("resource") + @Timeout(value = 20, unit = TimeUnit.MINUTES) + void shouldBeAbleToRunMultipleQueriesWhileFetchingRecords() { + // Given + var session = neo4j.driver().session(ReactiveSession.class); + var tx = Mono.fromDirect(session.beginTransaction()).block(); + assertNotNull(tx); + var recordsFutures = new CompletableFuture[100]; + + // When + for (var i = 0; i < recordsFutures.length; i++) { + var recordsFuture = new CompletableFuture(); + recordsFutures[i] = recordsFuture; + // This effectively leads to the main thread submitting new queries while the driver thread is fetching + // records from results as they become available. The objective is to make sure the driver handles shared + // Bolt connection as expected. + Mono.fromDirect(tx.run("UNWIND range(0, 10) AS x RETURN x")) + .flatMapMany(ReactiveResult::records) + .collectList() + .doOnNext(ignored -> recordsFuture.complete(null)) + .subscribe(); + } + + // Then + CompletableFuture.allOf(recordsFutures).join(); + Mono.fromDirect(tx.close()).block(); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java index 3c8b0c130e..e698ddaa2b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -68,7 +69,10 @@ class InternalTransactionTest { void setUp() { connection = connectionMock(new BoltProtocolVersion(4, 0)); var connectionProvider = mock(DriverBoltConnectionProvider.class); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connectionProvider.connect(any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedFuture(connection)); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java index bddb3892db..2a9570dbb4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java @@ -97,7 +97,10 @@ class InternalAsyncSessionTest { @BeforeEach void setUp() { connection = connectionMock(new BoltProtocolVersion(4, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.close()).willReturn(completedFuture(null)); connectionProvider = mock(DriverBoltConnectionProvider.class); given(connectionProvider.connect(any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java index d6d5bdd026..496b54e853 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -73,7 +74,10 @@ class InternalAsyncTransactionTest { @BeforeEach void setUp() { connection = connectionMock(new BoltProtocolVersion(4, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); var connectionProvider = mock(DriverBoltConnectionProvider.class); given(connectionProvider.connect(any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) .willAnswer((Answer>) invocation -> { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java index 1a60b15293..95e8be9c55 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.mockito.ArgumentCaptor; @@ -93,7 +94,10 @@ void logsMessageWithStacktraceDuringFinalizationIfLeaked(TestInfo testInfo) thro var log = mock(Logger.class); when(logging.getLog(any(Class.class))).thenReturn(log); var connection = TestUtil.connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); setupConnectionAnswers(connection, List.of(handler -> { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java index 6cab96aa21..78b2b41975 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -88,7 +89,10 @@ class NetworkSessionTest { @BeforeEach void setUp() { connection = connectionMock(new BoltProtocolVersion(5, 4)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.close()).willReturn(completedFuture(null)); connectionProvider = mock(DriverBoltConnectionProvider.class); given(connectionProvider.connect(any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) @@ -311,7 +315,6 @@ void updatesBookmarkWhenTxIsClosed() { @Test void releasesConnectionWhenTxIsClosed() { - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); given(connection.run(any(), any())).willAnswer((Answer>) @@ -536,11 +539,17 @@ void shouldRunAfterBeginTxFailureOnBookmark() { void shouldBeginTxAfterBeginTxFailureOnBookmark() { var error = new RuntimeException("Hi"); var connection1 = connectionMock(new BoltProtocolVersion(5, 0)); - given(connection1.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection1.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection1.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.failedStage(error)); var connection2 = connectionMock(new BoltProtocolVersion(5, 0)); - given(connection2.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection2.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection2.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection2)); setupConnectionAnswers(connection2, List.of(handler -> { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index 3907298a32..4ddd1e7fdb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -82,7 +82,10 @@ class UnmanagedTransactionTest { void shouldFlushOnRunAsync() { // Given var connection = connectionMock(new BoltProtocolVersion(5, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); given(connection.run(any(), any())).willReturn(CompletableFuture.completedStage(connection)); @@ -112,7 +115,10 @@ void shouldFlushOnRunAsync() { void shouldFlushOnRunRx() { // Given var connection = connectionMock(new BoltProtocolVersion(5, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); given(connection.run(any(), any())).willReturn(CompletableFuture.completedStage(connection)); @@ -141,7 +147,10 @@ void shouldFlushOnRunRx() { void shouldRollbackOnImplicitFailure() { // Given var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); given(connection.rollback()).willReturn(CompletableFuture.completedStage(connection)); @@ -172,7 +181,10 @@ void shouldRollbackOnImplicitFailure() { @Test void shouldBeginTransaction() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -189,7 +201,10 @@ void shouldBeginTransaction() { @Test void shouldBeOpenAfterConstruction() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -205,7 +220,10 @@ void shouldBeOpenAfterConstruction() { @Test void shouldBeClosedWhenMarkedAsTerminated() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -222,7 +240,10 @@ void shouldBeClosedWhenMarkedAsTerminated() { @Test void shouldBeClosedWhenMarkedTerminatedAndClosed() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(completedFuture(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -242,7 +263,10 @@ void shouldBeClosedWhenMarkedTerminatedAndClosed() { void shouldReleaseConnectionWhenBeginFails() { var error = new RuntimeException("Wrong bookmark!"); var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -275,7 +299,10 @@ void shouldReleaseConnectionWhenBeginFails() { @Test void shouldNotReleaseConnectionWhenBeginSucceeds() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -439,7 +466,10 @@ void shouldReleaseConnectionWhenTerminatedAndRolledBack() { @Test void shouldReleaseConnectionWhenClose() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.rollback()).willReturn(CompletableFuture.completedStage(connection)); setupConnectionAnswers(connection, List.of(handler -> { handler.onRollbackSummary(mock(RollbackSummary.class)); @@ -468,7 +498,10 @@ void shouldReleaseConnectionWhenClose() { void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure() { var exception = new AuthorizationExpiredException("code", "message"); var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -501,7 +534,10 @@ void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure() { @Test void shouldReleaseConnectionOnConnectionReadTimeoutExceptionFailure() { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -545,7 +581,10 @@ private static Stream similarTransactionCompletingActionArgs() { void shouldReturnExistingStageOnSimilarCompletingAction( boolean protocolCommit, String initialAction, String similarAction) { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.commit()).willReturn(CompletableFuture.completedStage(connection)); given(connection.rollback()).willReturn(CompletableFuture.completedStage(connection)); given(connection.flush(any())).willReturn(CompletableFuture.completedStage(null)); @@ -597,7 +636,10 @@ void shouldReturnFailingStageOnConflictingCompletingAction( String conflictingAction, String expectedErrorMsg) { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.commit()).willReturn(CompletableFuture.completedStage(connection)); given(connection.rollback()).willReturn(CompletableFuture.completedStage(connection)); if (protocolActionCompleted) { @@ -662,7 +704,10 @@ private static Stream closingNotActionTransactionArgs() { void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommittingAborted( boolean protocolCommit, int expectedProtocolInvocations, String originalAction, Boolean commitOnClose) { var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.commit()).willReturn(CompletableFuture.completedStage(connection)); given(connection.rollback()).willReturn(CompletableFuture.completedStage(connection)); setupConnectionAnswers(connection, List.of(handler -> { @@ -705,7 +750,10 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt void shouldTerminateOnTerminateAsync() { // Given var connection = connectionMock(new BoltProtocolVersion(4, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); given(connection.clear()).willReturn(CompletableFuture.completedStage(connection)); @@ -735,7 +783,10 @@ void shouldTerminateOnTerminateAsync() { void shouldServeTheSameStageOnTerminateAsync() { // Given var connection = connectionMock(new BoltProtocolVersion(4, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); given(connection.clear()).willReturn(CompletableFuture.completedStage(connection)); @@ -765,7 +816,10 @@ void shouldServeTheSameStageOnTerminateAsync() { void shouldHandleTerminationWhenAlreadyTerminated() throws ExecutionException, InterruptedException { // Given var connection = connectionMock(new BoltProtocolVersion(4, 0)); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); given(connection.run(any(), any())).willReturn(CompletableFuture.completedStage(connection)); @@ -802,7 +856,10 @@ void shouldHandleTerminationWhenAlreadyTerminated() throws ExecutionException, I void shouldThrowOnRunningNewQueriesWhenTransactionIsClosing(TransactionClosingTestParams testParams) { // Given var connection = connectionMock(); - given(connection.onLoop()).willReturn(CompletableFuture.completedStage(connection)); + given(connection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(connection.beginTransaction(any(), any(), any(), any(), any(), any(), any(), any(), any())) .willReturn(CompletableFuture.completedStage(connection)); given(connection.commit()).willReturn(CompletableFuture.completedStage(connection)); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java index be01917746..82cf1e62e0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java @@ -68,7 +68,8 @@ void shouldNotifyRecordConsumerOfRunError(boolean getRunError) { // given var runError = mock(Throwable.class); given(connection.serverAddress()).willReturn(new BoltServerAddress("localhost")); - var cursor = new RxResultCursorImpl(connection, query, null, runError, bookmarkConsumer, false, Logging.none()); + var cursor = new RxResultCursorImpl( + connection, mock(), query, null, runError, bookmarkConsumer, false, Logging.none()); if (getRunError) { assertEquals(runError, cursor.getRunError()); } @@ -89,7 +90,8 @@ void shouldReturnSummaryWithRunError(boolean getRunError) { // given var runError = mock(Throwable.class); given(connection.serverAddress()).willReturn(new BoltServerAddress("localhost")); - var cursor = new RxResultCursorImpl(connection, query, null, runError, bookmarkConsumer, false, Logging.none()); + var cursor = new RxResultCursorImpl( + connection, mock(), query, null, runError, bookmarkConsumer, false, Logging.none()); if (getRunError) { assertEquals(runError, cursor.getRunError()); } @@ -107,8 +109,8 @@ void shouldReturnKeys() { // given var keys = List.of("a", "b"); given(runSummary.keys()).willReturn(keys); - var cursor = - new RxResultCursorImpl(connection, query, runSummary, null, bookmarkConsumer, false, Logging.none()); + var cursor = new RxResultCursorImpl( + connection, mock(), query, runSummary, null, bookmarkConsumer, false, Logging.none()); // when & then assertEquals(keys, cursor.keys()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java index a6264008c1..a1c12f1446 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxResultTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -137,6 +138,10 @@ void shouldCancelKeys() { void shouldObtainRecordsAndSummary() { // Given var boltConnection = mock(DriverBoltConnection.class); + given(boltConnection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(boltConnection.pull(anyLong(), anyLong())).willReturn(CompletableFuture.completedFuture(boltConnection)); given(boltConnection.serverAddress()).willReturn(new BoltServerAddress("localhost")); given(boltConnection.protocolVersion()).willReturn(new BoltProtocolVersion(5, 1)); @@ -170,6 +175,10 @@ void shouldObtainRecordsAndSummary() { void shouldCancelStreamingButObtainSummary() { // Given var boltConnection = mock(DriverBoltConnection.class); + given(boltConnection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(boltConnection.pull(anyLong(), anyLong())).willReturn(CompletableFuture.completedFuture(boltConnection)); given(boltConnection.serverAddress()).willReturn(new BoltServerAddress("localhost")); given(boltConnection.protocolVersion()).willReturn(new BoltProtocolVersion(5, 1)); @@ -214,6 +223,10 @@ void shouldErrorIfFailedToCreateCursor() { void shouldErrorIfFailedToStream() { // Given var boltConnection = mock(DriverBoltConnection.class); + given(boltConnection.onLoop(any())).willAnswer(invocationOnMock -> { + Supplier supplier = invocationOnMock.getArgument(0); + return CompletableFuture.completedStage(supplier.get()); + }); given(boltConnection.pull(anyLong(), anyLong())).willReturn(CompletableFuture.completedFuture(boltConnection)); given(boltConnection.serverAddress()).willReturn(new BoltServerAddress("localhost")); given(boltConnection.protocolVersion()).willReturn(new BoltProtocolVersion(5, 1)); @@ -257,7 +270,7 @@ private InternalRxResult newRxResult(DriverBoltConnection boltConnection) { private InternalRxResult newRxResult(DriverBoltConnection boltConnection, RunSummary runSummary) { RxResultCursor cursor = new RxResultCursorImpl( - boltConnection, mock(), runSummary, null, databaseBookmark -> {}, false, Logging.none()); + boltConnection, mock(), mock(), runSummary, null, databaseBookmark -> {}, false, Logging.none()); return newRxResult(cursor); } diff --git a/pom.xml b/pom.xml index 362c4dfb6d..00273a1077 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ true - 1.1.0 + 2.0.0 1.0.4 diff --git a/testkit/stress.py b/testkit/stress.py index e0578e36c9..e389bc03e8 100644 --- a/testkit/stress.py +++ b/testkit/stress.py @@ -30,5 +30,7 @@ "-DexecutionTimeSeconds=10", "-Dmaven.gitcommitid.skip=true", ] + if os.getenv("TEST_NEO4J_BOLT_CONNECTION", "false") == "true" : + cmd.append("-Dneo4j-bolt-connection-bom.version=0.0.0") subprocess.run(cmd, universal_newlines=True, stderr=subprocess.STDOUT, check=True) diff --git a/testkit/unittests.py b/testkit/unittests.py index cf24eb9f28..7418cc0f6f 100644 --- a/testkit/unittests.py +++ b/testkit/unittests.py @@ -4,6 +4,7 @@ Assumes driver has been setup by build script prior to this. """ import subprocess +import os def run(args): @@ -12,4 +13,7 @@ def run(args): if __name__ == "__main__": - run(["mvn", "test", "-Dmaven.gitcommitid.skip"]) + cmd = ["mvn", "test", "-Dmaven.gitcommitid.skip"] + if os.getenv("TEST_NEO4J_BOLT_CONNECTION", "false") == "true" : + cmd.append("-Dneo4j-bolt-connection-bom.version=0.0.0") + run(cmd)