|
26 | 26 | import reactor.test.StepVerifier;
|
27 | 27 |
|
28 | 28 | import java.util.Arrays;
|
| 29 | +import java.util.Collections; |
29 | 30 | import java.util.Iterator;
|
30 | 31 | import java.util.Set;
|
31 | 32 | import java.util.concurrent.atomic.AtomicInteger;
|
|
38 | 39 | import org.neo4j.driver.exceptions.SessionExpiredException;
|
39 | 40 | import org.neo4j.driver.exceptions.TransientException;
|
40 | 41 | import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
|
41 |
| -import org.neo4j.driver.reactive.RxStatementResult; |
42 | 42 | import org.neo4j.driver.reactive.RxSession;
|
| 43 | +import org.neo4j.driver.reactive.RxStatementResult; |
43 | 44 | import org.neo4j.driver.reactive.RxTransaction;
|
44 | 45 | import org.neo4j.driver.reactive.RxTransactionWork;
|
45 | 46 | import org.neo4j.driver.util.DatabaseExtension;
|
@@ -182,6 +183,43 @@ void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure()
|
182 | 183 | assertNoParallelScheduler();
|
183 | 184 | }
|
184 | 185 |
|
| 186 | + @Test |
| 187 | + void shouldHandleNestedQueries() { |
| 188 | + int size = 12555; |
| 189 | + |
| 190 | + Flux<Integer> nodeIds = Flux.using( neo4j.driver()::rxSession, |
| 191 | + session -> Flux.from( session.run( "UNWIND range(1, $size) AS x RETURN x", |
| 192 | + Collections.singletonMap( "size", size ) ).records() ) |
| 193 | + .limitRate( 20 ) |
| 194 | + .flatMap( record -> { |
| 195 | + int x = record.get( "x" ).asInt(); |
| 196 | + RxStatementResult innerResult = session.run( "CREATE (n:Node {id: $x}) RETURN n.id", |
| 197 | + Collections.singletonMap( "x", x ) ); |
| 198 | + return innerResult.records(); |
| 199 | + } ).map( r -> r.get( 0 ).asInt() ), RxSession::close ); |
| 200 | + |
| 201 | + StepVerifier.create( nodeIds ).expectNextCount( size ).verifyComplete(); |
| 202 | + } |
| 203 | + |
| 204 | + @Test |
| 205 | + void shouldGiveHelpfulErrorMessageWhenNestingTransactionFunctions() { |
| 206 | + |
| 207 | + int size = 12555; |
| 208 | + Flux<Integer> nodeIds = Flux.using( neo4j.driver()::rxSession, |
| 209 | + session -> Flux.from( session.readTransaction( tx -> |
| 210 | + tx.run( "UNWIND range(1, $size) AS x RETURN x", |
| 211 | + Collections.singletonMap( "size", size ) ).records() ) ) |
| 212 | + .limitRate( 20 ) |
| 213 | + .flatMap( record -> { |
| 214 | + int x = record.get( "x" ).asInt(); |
| 215 | + return session.writeTransaction( tx -> |
| 216 | + tx.run( "CREATE (n:Node {id: $x}) RETURN n.id", |
| 217 | + Collections.singletonMap( "x", x ) ).records() ); |
| 218 | + } ).map( r -> r.get( 0 ).asInt() ), RxSession::close ); |
| 219 | + |
| 220 | + StepVerifier.create( nodeIds ).expectNextCount( size ).verifyComplete(); |
| 221 | + } |
| 222 | + |
185 | 223 | private void assertNoParallelScheduler()
|
186 | 224 | {
|
187 | 225 | Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
|
|
0 commit comments