Skip to content

Commit da92b35

Browse files
committed
Update shouldReleaseResultsOnSubscriptionCancellation tests
1 parent 39f57f7 commit da92b35

File tree

3 files changed

+138
-75
lines changed

3 files changed

+138
-75
lines changed

driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@
2727

2828
import java.time.Instant;
2929
import java.time.temporal.ChronoUnit;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
3032
import java.util.List;
3133
import java.util.Map;
3234
import java.util.UUID;
3335
import java.util.concurrent.CompletableFuture;
34-
import java.util.concurrent.Executors;
3536
import java.util.concurrent.Flow;
3637
import java.util.concurrent.atomic.AtomicBoolean;
3738
import java.util.function.Function;
@@ -49,6 +50,7 @@
4950
import org.neo4j.driver.reactive.ReactiveResult;
5051
import org.neo4j.driver.reactive.ReactiveSession;
5152
import org.neo4j.driver.testutil.DatabaseExtension;
53+
import org.neo4j.driver.testutil.LoggingUtil;
5254
import org.neo4j.driver.testutil.ParallelizableIT;
5355
import org.reactivestreams.Publisher;
5456
import org.reactivestreams.Subscription;
@@ -81,51 +83,49 @@ void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher
8183
@ValueSource(booleans = {true, false})
8284
@SuppressWarnings("BusyWait")
8385
void shouldReleaseResultsOnSubscriptionCancellation(boolean request) throws InterruptedException {
84-
var config = Config.builder().withDriverMetrics().build();
86+
var messages = Collections.synchronizedList(new ArrayList<String>());
87+
var config = Config.builder()
88+
.withDriverMetrics()
89+
.withLogging(LoggingUtil.boltLogging(messages))
90+
.build();
8591
try (var driver = neo4j.customDriver(config)) {
8692
// verify the database is available as runs may not report errors due to the subscription cancellation
8793
driver.verifyConnectivity();
88-
var threadsNumber = 100;
89-
var executorService = Executors.newFixedThreadPool(threadsNumber);
90-
91-
var subscriptionFutures = IntStream.range(0, threadsNumber)
92-
.mapToObj(ignored -> CompletableFuture.supplyAsync(
93-
() -> {
94-
var subscriptionFuture = new CompletableFuture<Flow.Subscription>();
95-
driver.session(ReactiveSession.class)
96-
.run("UNWIND range (0,10000) AS x RETURN x")
97-
.subscribe(new Flow.Subscriber<>() {
98-
@Override
99-
public void onSubscribe(Flow.Subscription subscription) {
100-
subscriptionFuture.complete(subscription);
101-
}
102-
103-
@Override
104-
public void onNext(ReactiveResult item) {
105-
// ignored
106-
}
107-
108-
@Override
109-
public void onError(Throwable throwable) {
110-
// ignored
111-
}
112-
113-
@Override
114-
public void onComplete() {
115-
// ignored
116-
}
117-
});
118-
return subscriptionFuture.thenApplyAsync(
119-
subscription -> {
120-
if (request) {
121-
subscription.request(1);
122-
}
123-
subscription.cancel();
124-
return subscription;
125-
},
126-
executorService);
127-
},
128-
executorService))
94+
var tasksNumber = 100;
95+
var subscriptionFutures = IntStream.range(0, tasksNumber)
96+
.mapToObj(ignored -> CompletableFuture.supplyAsync(() -> {
97+
var subscriptionFuture = new CompletableFuture<Flow.Subscription>();
98+
driver.session(ReactiveSession.class)
99+
.run("UNWIND range (0,10000) AS x RETURN x")
100+
.subscribe(new Flow.Subscriber<>() {
101+
@Override
102+
public void onSubscribe(Flow.Subscription subscription) {
103+
subscriptionFuture.complete(subscription);
104+
}
105+
106+
@Override
107+
public void onNext(ReactiveResult item) {
108+
// ignored
109+
}
110+
111+
@Override
112+
public void onError(Throwable throwable) {
113+
// ignored
114+
}
115+
116+
@Override
117+
public void onComplete() {
118+
// ignored
119+
}
120+
});
121+
return subscriptionFuture.thenApplyAsync(subscription -> {
122+
if (request) {
123+
subscription.request(1);
124+
}
125+
subscription.cancel();
126+
return subscription;
127+
});
128+
}))
129129
.map(future -> future.thenCompose(itself -> itself))
130130
.toArray(CompletableFuture[]::new);
131131

@@ -144,7 +144,9 @@ public void onComplete() {
144144
}
145145
Thread.sleep(100);
146146
}
147-
fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections));
147+
fail(String.format(
148+
"not all connections have been released\n%d are still in use\nlatest metrics: %s\nmessage log: \n%s",
149+
totalInUseConnections, driver.metrics().connectionPoolMetrics(), String.join("\n", messages)));
148150
}
149151
}
150152

driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525

2626
import java.time.Instant;
2727
import java.time.temporal.ChronoUnit;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
2830
import java.util.List;
2931
import java.util.Map;
3032
import java.util.UUID;
3133
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.Executors;
3334
import java.util.concurrent.atomic.AtomicBoolean;
3435
import java.util.function.Function;
3536
import java.util.stream.IntStream;
@@ -46,6 +47,7 @@
4647
import org.neo4j.driver.reactivestreams.ReactiveResult;
4748
import org.neo4j.driver.reactivestreams.ReactiveSession;
4849
import org.neo4j.driver.testutil.DatabaseExtension;
50+
import org.neo4j.driver.testutil.LoggingUtil;
4951
import org.neo4j.driver.testutil.ParallelizableIT;
5052
import org.reactivestreams.Publisher;
5153
import org.reactivestreams.Subscription;
@@ -79,38 +81,36 @@ void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher
7981
@ValueSource(booleans = {true, false})
8082
@SuppressWarnings("BusyWait")
8183
void shouldReleaseResultsOnSubscriptionCancellation(boolean request) throws InterruptedException {
82-
var config = Config.builder().withDriverMetrics().build();
84+
var messages = Collections.synchronizedList(new ArrayList<String>());
85+
var config = Config.builder()
86+
.withDriverMetrics()
87+
.withLogging(LoggingUtil.boltLogging(messages))
88+
.build();
8389
try (var driver = neo4j.customDriver(config)) {
8490
// verify the database is available as runs may not report errors due to the subscription cancellation
8591
driver.verifyConnectivity();
86-
var threadsNumber = 100;
87-
var executorService = Executors.newFixedThreadPool(threadsNumber);
88-
89-
var subscriptionFutures = IntStream.range(0, threadsNumber)
90-
.mapToObj(ignored -> CompletableFuture.supplyAsync(
91-
() -> {
92-
var subscriptionFuture = new CompletableFuture<Subscription>();
93-
driver.session(ReactiveSession.class)
94-
.run("UNWIND range (0,10000) AS x RETURN x")
95-
.subscribe(new BaseSubscriber<>() {
96-
@Override
97-
protected void hookOnSubscribe(Subscription subscription) {
98-
// use subscription from another thread to avoid immediate cancellation
99-
// within the subscribe method
100-
subscriptionFuture.complete(subscription);
101-
}
102-
});
103-
return subscriptionFuture.thenApplyAsync(
104-
subscription -> {
105-
if (request) {
106-
subscription.request(1);
107-
}
108-
subscription.cancel();
109-
return subscription;
110-
},
111-
executorService);
112-
},
113-
executorService))
92+
var tasksNumber = 100;
93+
var subscriptionFutures = IntStream.range(0, tasksNumber)
94+
.mapToObj(ignored -> CompletableFuture.supplyAsync(() -> {
95+
var subscriptionFuture = new CompletableFuture<Subscription>();
96+
driver.session(ReactiveSession.class)
97+
.run("UNWIND range (0,10000) AS x RETURN x")
98+
.subscribe(new BaseSubscriber<>() {
99+
@Override
100+
protected void hookOnSubscribe(Subscription subscription) {
101+
// use subscription from another thread to avoid immediate cancellation
102+
// within the subscribe method
103+
subscriptionFuture.complete(subscription);
104+
}
105+
});
106+
return subscriptionFuture.thenApplyAsync(subscription -> {
107+
if (request) {
108+
subscription.request(1);
109+
}
110+
subscription.cancel();
111+
return subscription;
112+
});
113+
}))
114114
.map(future -> future.thenCompose(itself -> itself))
115115
.toArray(CompletableFuture[]::new);
116116

@@ -129,7 +129,9 @@ protected void hookOnSubscribe(Subscription subscription) {
129129
}
130130
Thread.sleep(100);
131131
}
132-
fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections));
132+
fail(String.format(
133+
"not all connections have been released\n%d are still in use\nlatest metrics: %s\nmessage log: \n%s",
134+
totalInUseConnections, driver.metrics().connectionPoolMetrics(), String.join("\n", messages)));
133135
}
134136
}
135137

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.testutil;
20+
21+
import static org.hamcrest.Matchers.not;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.BDDMockito.given;
24+
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
27+
28+
import java.time.LocalDateTime;
29+
import java.util.List;
30+
import org.neo4j.driver.Logger;
31+
import org.neo4j.driver.Logging;
32+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
33+
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
34+
35+
public class LoggingUtil {
36+
public static Logging boltLogging(List<String> messages) {
37+
var logging = mock(Logging.class);
38+
var noopLogger = mock(Logger.class);
39+
var accumulatingLogger = mock(Logger.class);
40+
given(logging.getLog(argThat(not(InboundMessageDispatcher.class)))).willReturn(noopLogger);
41+
given(logging.getLog(argThat(not(OutboundMessageHandler.class)))).willReturn(noopLogger);
42+
given(logging.getLog(InboundMessageDispatcher.class)).willReturn(accumulatingLogger);
43+
given(logging.getLog(OutboundMessageHandler.class)).willReturn(accumulatingLogger);
44+
given(accumulatingLogger.isDebugEnabled()).willReturn(true);
45+
willAnswer(invocationOnMock -> {
46+
var message = (String) invocationOnMock.getArgument(0);
47+
if (message.contains("C: ") || message.contains("S: ")) {
48+
var formattedMessage = String.format(
49+
LocalDateTime.now() + " " + message,
50+
invocationOnMock.getArgument(1).toString());
51+
messages.add(formattedMessage);
52+
}
53+
return null;
54+
})
55+
.given(accumulatingLogger)
56+
.debug(any(String.class), any(Object.class));
57+
return logging;
58+
}
59+
}

0 commit comments

Comments
 (0)