diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java index 767b302229..9d07b9cbd8 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java @@ -27,11 +27,12 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -49,6 +50,7 @@ import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.testutil.DatabaseExtension; +import org.neo4j.driver.testutil.LoggingUtil; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -81,51 +83,49 @@ void shouldErrorWhenReactiveResultIsReturned(Function()); + var config = Config.builder() + .withDriverMetrics() + .withLogging(LoggingUtil.boltLogging(messages)) + .build(); try (var driver = neo4j.customDriver(config)) { // verify the database is available as runs may not report errors due to the subscription cancellation driver.verifyConnectivity(); - var threadsNumber = 100; - var executorService = Executors.newFixedThreadPool(threadsNumber); - - var subscriptionFutures = IntStream.range(0, threadsNumber) - .mapToObj(ignored -> CompletableFuture.supplyAsync( - () -> { - var subscriptionFuture = new CompletableFuture(); - driver.session(ReactiveSession.class) - .run("UNWIND range (0,10000) AS x RETURN x") - .subscribe(new Flow.Subscriber<>() { - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscriptionFuture.complete(subscription); - } - - @Override - public void onNext(ReactiveResult item) { - // ignored - } - - @Override - public void onError(Throwable throwable) { - // ignored - } - - @Override - public void onComplete() { - // ignored - } - }); - return subscriptionFuture.thenApplyAsync( - subscription -> { - if (request) { - subscription.request(1); - } - subscription.cancel(); - return subscription; - }, - executorService); - }, - executorService)) + var tasksNumber = 100; + var subscriptionFutures = IntStream.range(0, tasksNumber) + .mapToObj(ignored -> CompletableFuture.supplyAsync(() -> { + var subscriptionFuture = new CompletableFuture(); + driver.session(ReactiveSession.class) + .run("UNWIND range (0,10000) AS x RETURN x") + .subscribe(new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscriptionFuture.complete(subscription); + } + + @Override + public void onNext(ReactiveResult result) { + flowPublisherToFlux(result.consume()).subscribe(); + } + + @Override + public void onError(Throwable throwable) { + // ignored + } + + @Override + public void onComplete() { + // ignored + } + }); + return subscriptionFuture.thenApplyAsync(subscription -> { + if (request) { + subscription.request(1); + } + subscription.cancel(); + return subscription; + }); + })) .map(future -> future.thenCompose(itself -> itself)) .toArray(CompletableFuture[]::new); @@ -144,7 +144,9 @@ public void onComplete() { } Thread.sleep(100); } - fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections)); + fail(String.format( + "not all connections have been released\n%d are still in use\nlatest metrics: %s\nmessage log: \n%s", + totalInUseConnections, driver.metrics().connectionPoolMetrics(), String.join("\n", messages))); } } diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java index eb8e84e4bb..354f8090ec 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java @@ -25,11 +25,12 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.IntStream; @@ -46,6 +47,7 @@ import org.neo4j.driver.reactivestreams.ReactiveResult; import org.neo4j.driver.reactivestreams.ReactiveSession; import org.neo4j.driver.testutil.DatabaseExtension; +import org.neo4j.driver.testutil.LoggingUtil; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -79,38 +81,41 @@ void shouldErrorWhenReactiveResultIsReturned(Function()); + var config = Config.builder() + .withDriverMetrics() + .withLogging(LoggingUtil.boltLogging(messages)) + .build(); try (var driver = neo4j.customDriver(config)) { // verify the database is available as runs may not report errors due to the subscription cancellation driver.verifyConnectivity(); - var threadsNumber = 100; - var executorService = Executors.newFixedThreadPool(threadsNumber); + var tasksNumber = 100; + var subscriptionFutures = IntStream.range(0, tasksNumber) + .mapToObj(ignored -> CompletableFuture.supplyAsync(() -> { + var subscriptionFuture = new CompletableFuture(); + driver.session(ReactiveSession.class) + .run("UNWIND range (0,10000) AS x RETURN x") + .subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + // use subscription from another thread to avoid immediate cancellation + // within the subscribe method + subscriptionFuture.complete(subscription); + } - var subscriptionFutures = IntStream.range(0, threadsNumber) - .mapToObj(ignored -> CompletableFuture.supplyAsync( - () -> { - var subscriptionFuture = new CompletableFuture(); - driver.session(ReactiveSession.class) - .run("UNWIND range (0,10000) AS x RETURN x") - .subscribe(new BaseSubscriber<>() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - // use subscription from another thread to avoid immediate cancellation - // within the subscribe method - subscriptionFuture.complete(subscription); - } - }); - return subscriptionFuture.thenApplyAsync( - subscription -> { - if (request) { - subscription.request(1); - } - subscription.cancel(); - return subscription; - }, - executorService); - }, - executorService)) + @Override + protected void hookOnNext(ReactiveResult result) { + Mono.fromDirect(result.consume()).subscribe(); + } + }); + return subscriptionFuture.thenApplyAsync(subscription -> { + if (request) { + subscription.request(1); + } + subscription.cancel(); + return subscription; + }); + })) .map(future -> future.thenCompose(itself -> itself)) .toArray(CompletableFuture[]::new); @@ -129,7 +134,9 @@ protected void hookOnSubscribe(Subscription subscription) { } Thread.sleep(100); } - fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections)); + fail(String.format( + "not all connections have been released\n%d are still in use\nlatest metrics: %s\nmessage log: \n%s", + totalInUseConnections, driver.metrics().connectionPoolMetrics(), String.join("\n", messages))); } } diff --git a/driver/src/test/java/org/neo4j/driver/testutil/LoggingUtil.java b/driver/src/test/java/org/neo4j/driver/testutil/LoggingUtil.java new file mode 100644 index 0000000000..66216a64c9 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/testutil/LoggingUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.testutil; + +import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; + +import java.time.LocalDateTime; +import java.util.List; +import org.neo4j.driver.Logger; +import org.neo4j.driver.Logging; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler; + +public class LoggingUtil { + public static Logging boltLogging(List messages) { + var logging = mock(Logging.class); + var noopLogger = mock(Logger.class); + var accumulatingLogger = mock(Logger.class); + given(logging.getLog(argThat(not(InboundMessageDispatcher.class)))).willReturn(noopLogger); + given(logging.getLog(argThat(not(OutboundMessageHandler.class)))).willReturn(noopLogger); + given(logging.getLog(InboundMessageDispatcher.class)).willReturn(accumulatingLogger); + given(logging.getLog(OutboundMessageHandler.class)).willReturn(accumulatingLogger); + given(accumulatingLogger.isDebugEnabled()).willReturn(true); + willAnswer(invocationOnMock -> { + var message = (String) invocationOnMock.getArgument(0); + if (message.contains("C: ") || message.contains("S: ")) { + var formattedMessage = String.format( + LocalDateTime.now() + " " + message, + invocationOnMock.getArgument(1).toString()); + messages.add(formattedMessage); + } + return null; + }) + .given(accumulatingLogger) + .debug(any(String.class), any(Object.class)); + return logging; + } +}