Skip to content

Commit b1bb793

Browse files
committed
x
1 parent 749b965 commit b1bb793

File tree

8 files changed

+2531
-0
lines changed

8 files changed

+2531
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.integration.reactive;
18+
19+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
20+
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
21+
22+
import java.util.function.Function;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.extension.RegisterExtension;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.NullSource;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
import org.neo4j.driver.TransactionConfig;
29+
import org.neo4j.driver.internal.bolt.api.TelemetryApi;
30+
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
31+
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
32+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
33+
import org.neo4j.driver.reactive.ReactiveSession;
34+
import org.neo4j.driver.reactive.ReactiveTransaction;
35+
import org.neo4j.driver.summary.ResultSummary;
36+
import org.neo4j.driver.testutil.DatabaseExtension;
37+
import org.neo4j.driver.testutil.ParallelizableIT;
38+
import reactor.core.publisher.Mono;
39+
import reactor.test.StepVerifier;
40+
41+
@EnabledOnNeo4jWith(BOLT_V4)
42+
@ParallelizableIT
43+
class InternalReactiveSessionIT {
44+
@RegisterExtension
45+
static final DatabaseExtension neo4j = new DatabaseExtension();
46+
47+
private InternalReactiveSession session;
48+
49+
@BeforeEach
50+
@SuppressWarnings("resource")
51+
void setUp() {
52+
session = (InternalReactiveSession) neo4j.driver().session(ReactiveSession.class);
53+
}
54+
55+
@ParameterizedTest
56+
@NullSource
57+
@ValueSource(strings = {"IMPLICIT", ""})
58+
void shouldAcceptTxTypeWhenAvailable(String txType) {
59+
// GIVEN
60+
var txConfig = TransactionConfig.empty();
61+
var apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION);
62+
var txMono = Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(txConfig, txType, apiTelemetryWork)));
63+
Function<ReactiveTransaction, Mono<ResultSummary>> txUnit =
64+
tx -> Mono.fromDirect(flowPublisherToFlux(tx.run("RETURN 1")))
65+
.flatMap(result -> Mono.fromDirect(flowPublisherToFlux(result.consume())));
66+
Function<ReactiveTransaction, Mono<Void>> commit = tx -> Mono.fromDirect(flowPublisherToFlux(tx.commit()));
67+
68+
// WHEN
69+
var summaryMono = Mono.usingWhen(txMono, txUnit, commit);
70+
71+
// THEN
72+
StepVerifier.create(summaryMono).expectNextCount(1).expectComplete().verify();
73+
}
74+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.integration.reactive;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertThrows;
21+
import static org.junit.jupiter.api.Assertions.fail;
22+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
23+
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
24+
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
25+
26+
import java.time.Instant;
27+
import java.time.temporal.ChronoUnit;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.UUID;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.Flow;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.function.Function;
37+
import java.util.stream.IntStream;
38+
import org.junit.jupiter.api.Disabled;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.RegisterExtension;
41+
import org.junit.jupiter.params.ParameterizedTest;
42+
import org.junit.jupiter.params.provider.MethodSource;
43+
import org.junit.jupiter.params.provider.ValueSource;
44+
import org.neo4j.driver.Config;
45+
import org.neo4j.driver.ConnectionPoolMetrics;
46+
import org.neo4j.driver.exceptions.ClientException;
47+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
48+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
49+
import org.neo4j.driver.reactive.ReactiveResult;
50+
import org.neo4j.driver.reactive.ReactiveSession;
51+
import org.neo4j.driver.testutil.DatabaseExtension;
52+
import org.neo4j.driver.testutil.LoggingUtil;
53+
import org.neo4j.driver.testutil.ParallelizableIT;
54+
import org.reactivestreams.Publisher;
55+
import org.reactivestreams.Subscription;
56+
import reactor.core.publisher.BaseSubscriber;
57+
import reactor.core.publisher.Flux;
58+
import reactor.util.annotation.NonNull;
59+
60+
@EnabledOnNeo4jWith(BOLT_V4)
61+
@ParallelizableIT
62+
class ReactiveSessionIT {
63+
@RegisterExtension
64+
static final DatabaseExtension neo4j = new DatabaseExtension();
65+
66+
@ParameterizedTest
67+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
68+
@SuppressWarnings("resource")
69+
void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher<ReactiveResult>> fn) {
70+
// GIVEN
71+
var session = neo4j.driver().session(ReactiveSession.class);
72+
73+
// WHEN & THEN
74+
var error = assertThrows(
75+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
76+
assertEquals(
77+
"org.neo4j.driver.reactive.ReactiveResult is not a valid return value, it should be consumed before producing a return value",
78+
error.getMessage());
79+
flowPublisherToFlux(session.close()).blockFirst();
80+
}
81+
82+
@ParameterizedTest
83+
@ValueSource(booleans = {true, false})
84+
@Disabled
85+
@SuppressWarnings("BusyWait")
86+
void shouldReleaseResultsOnSubscriptionCancellation(boolean request) throws InterruptedException {
87+
var messages = Collections.synchronizedList(new ArrayList<String>());
88+
var config = Config.builder()
89+
.withDriverMetrics()
90+
.withLogging(LoggingUtil.boltLogging(messages))
91+
.build();
92+
try (var driver = neo4j.customDriver(config)) {
93+
// verify the database is available as runs may not report errors due to the subscription cancellation
94+
driver.verifyConnectivity();
95+
var tasksNumber = 100;
96+
var subscriptionFutures = IntStream.range(0, tasksNumber)
97+
.mapToObj(ignored -> CompletableFuture.supplyAsync(() -> {
98+
var subscriptionFuture = new CompletableFuture<Flow.Subscription>();
99+
driver.session(ReactiveSession.class)
100+
.run("UNWIND range (0,10000) AS x RETURN x")
101+
.subscribe(new Flow.Subscriber<>() {
102+
@Override
103+
public void onSubscribe(Flow.Subscription subscription) {
104+
subscriptionFuture.complete(subscription);
105+
}
106+
107+
@Override
108+
public void onNext(ReactiveResult result) {
109+
flowPublisherToFlux(result.consume()).subscribe();
110+
}
111+
112+
@Override
113+
public void onError(Throwable throwable) {
114+
// ignored
115+
}
116+
117+
@Override
118+
public void onComplete() {
119+
// ignored
120+
}
121+
});
122+
return subscriptionFuture.thenApplyAsync(subscription -> {
123+
if (request) {
124+
subscription.request(1);
125+
}
126+
subscription.cancel();
127+
return subscription;
128+
});
129+
}))
130+
.map(future -> future.thenCompose(Function.identity()))
131+
.toArray(CompletableFuture[]::new);
132+
133+
CompletableFuture.allOf(subscriptionFutures).join();
134+
135+
// Subscription cancellation does not guarantee neither onComplete nor onError signal.
136+
var timeout = Instant.now().plus(5, ChronoUnit.MINUTES);
137+
var totalInUseConnections = -1;
138+
while (Instant.now().isBefore(timeout)) {
139+
totalInUseConnections = driver.metrics().connectionPoolMetrics().stream()
140+
.map(ConnectionPoolMetrics::inUse)
141+
.mapToInt(Integer::intValue)
142+
.sum();
143+
if (totalInUseConnections == 0) {
144+
return;
145+
}
146+
Thread.sleep(100);
147+
}
148+
fail(String.format(
149+
"not all connections have been released\n%d are still in use\nlatest metrics: %s\nmessage log: \n%s",
150+
totalInUseConnections, driver.metrics().connectionPoolMetrics(), String.join("\n", messages)));
151+
}
152+
}
153+
154+
@Test
155+
@Disabled
156+
void shouldRollbackResultOnSubscriptionCancellation() {
157+
var config = Config.builder().withMaxConnectionPoolSize(1).build();
158+
try (var driver = neo4j.customDriver(config)) {
159+
var session = driver.session(ReactiveSession.class);
160+
var nodeId = UUID.randomUUID().toString();
161+
var cancellationFuture = new CompletableFuture<Void>();
162+
163+
flowPublisherToFlux(session.run("CREATE ({id: $id})", Map.of("id", nodeId)))
164+
.subscribe(new BaseSubscriber<>() {
165+
@Override
166+
protected void hookOnSubscribe(@NonNull Subscription subscription) {
167+
subscription.cancel();
168+
cancellationFuture.complete(null);
169+
}
170+
});
171+
172+
cancellationFuture.join();
173+
174+
var nodesNum = flowPublisherToFlux(session.run("MATCH (n {id: $id}) RETURN n", Map.of("id", nodeId)))
175+
.flatMap(result -> flowPublisherToFlux(result.records()))
176+
.count()
177+
.block();
178+
assertEquals(0, nodesNum);
179+
}
180+
}
181+
182+
@Test
183+
void shouldEmitAllSuccessfullyEmittedValues() {
184+
@SuppressWarnings("resource")
185+
var session = neo4j.driver().session(ReactiveSession.class);
186+
var succeed = new AtomicBoolean();
187+
var numbers = flowPublisherToFlux(session.executeRead(tx -> {
188+
var numbersFlux = flowPublisherToFlux(tx.run("UNWIND range(0, 5) AS x RETURN x"))
189+
.flatMap(result -> flowPublisherToFlux(result.records()))
190+
.map(record -> record.get("x").asInt());
191+
return succeed.getAndSet(true)
192+
? publisherToFlowPublisher(numbersFlux)
193+
: publisherToFlowPublisher(numbersFlux.handle((value, sink) -> {
194+
if (value == 2) {
195+
sink.error(new ServiceUnavailableException("simulated"));
196+
} else {
197+
sink.next(value);
198+
}
199+
}));
200+
}))
201+
.collectList()
202+
.block();
203+
assertEquals(List.of(0, 1, 0, 1, 2, 3, 4, 5), numbers);
204+
}
205+
206+
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
207+
managedTransactionsReturningReactiveResultPublisher() {
208+
return List.of(
209+
session -> flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))),
210+
session -> flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1"))));
211+
}
212+
}

0 commit comments

Comments
 (0)