|
18 | 18 | */
|
19 | 19 | package org.neo4j.driver.tck.reactive;
|
20 | 20 |
|
21 |
| -import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; |
22 |
| - |
23 |
| -import java.time.Duration; |
24 | 21 | import org.neo4j.driver.Driver;
|
25 |
| -import org.neo4j.driver.reactive.ReactiveResult; |
26 |
| -import org.neo4j.driver.reactive.ReactiveSession; |
| 22 | +import org.neo4j.driver.Record; |
| 23 | +import org.neo4j.driver.reactivestreams.ReactiveResult; |
| 24 | +import org.neo4j.driver.reactivestreams.ReactiveSession; |
27 | 25 | import org.reactivestreams.Publisher;
|
28 | 26 | import org.reactivestreams.tck.PublisherVerification;
|
29 | 27 | import org.reactivestreams.tck.TestEnvironment;
|
30 | 28 | import org.testng.annotations.AfterClass;
|
31 | 29 | import org.testng.annotations.BeforeClass;
|
32 | 30 | import reactor.core.publisher.Mono;
|
33 | 31 |
|
34 |
| -public class ReactiveResultPublisherVerificationIT extends PublisherVerification<ReactiveResult> { |
| 32 | +import java.time.Duration; |
| 33 | +import java.util.Map; |
| 34 | + |
| 35 | +public class ReactiveResultPublisherVerificationIT extends PublisherVerification<Record> { |
35 | 36 | private final Neo4jManager NEO4J = new Neo4jManager();
|
36 | 37 | private static final Duration TIMEOUT = Duration.ofSeconds(10);
|
37 | 38 | private static final Duration TIMEOUT_FOR_NO_SIGNALS = Duration.ofSeconds(1);
|
@@ -63,15 +64,14 @@ public long maxElementsFromPublisher() {
|
63 | 64 | }
|
64 | 65 |
|
65 | 66 | @Override
|
66 |
| - public Publisher<ReactiveResult> createPublisher(long elements) { |
67 |
| - ReactiveSession session = driver.session(ReactiveSession.class); |
68 |
| - return Mono.from(flowPublisherToFlux(session.run("RETURN 1"))); |
| 67 | + public Publisher<Record> createPublisher(long elements) { |
| 68 | + var session = driver.session(ReactiveSession.class); |
| 69 | + return Mono.from(session.run("UNWIND range(0, $elements) AS x RETURN x", Map.of("elements", elements - 1))) |
| 70 | + .flatMapMany(ReactiveResult::records); |
69 | 71 | }
|
70 | 72 |
|
71 | 73 | @Override
|
72 |
| - public Publisher<ReactiveResult> createFailedPublisher() { |
73 |
| - ReactiveSession session = driver.session(ReactiveSession.class); |
74 |
| - // Please note that this publisher fails on run stage. |
75 |
| - return Mono.from(flowPublisherToFlux(session.run("RETURN 5/0"))); |
| 74 | + public Publisher<Record> createFailedPublisher() { |
| 75 | + return null; |
76 | 76 | }
|
77 | 77 | }
|
0 commit comments