|
26 | 26 | import reactor.test.StepVerifier;
|
27 | 27 |
|
28 | 28 | import java.net.URI;
|
| 29 | +import java.util.ArrayList; |
29 | 30 | import java.util.List;
|
30 | 31 | import java.util.Optional;
|
31 | 32 | import java.util.function.Consumer;
|
|
38 | 39 | import org.neo4j.driver.GraphDatabase;
|
39 | 40 | import org.neo4j.driver.Logger;
|
40 | 41 | import org.neo4j.driver.Record;
|
41 |
| -import org.neo4j.driver.Session; |
42 | 42 | import org.neo4j.driver.Result;
|
| 43 | +import org.neo4j.driver.Session; |
43 | 44 | import org.neo4j.driver.Transaction;
|
| 45 | +import org.neo4j.driver.async.AsyncSession; |
| 46 | +import org.neo4j.driver.async.ResultCursor; |
44 | 47 | import org.neo4j.driver.exceptions.TransientException;
|
45 | 48 | import org.neo4j.driver.internal.cluster.RoutingSettings;
|
46 | 49 | import org.neo4j.driver.internal.retry.RetrySettings;
|
47 | 50 | import org.neo4j.driver.internal.util.Clock;
|
48 | 51 | import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory;
|
49 |
| -import org.neo4j.driver.reactive.RxSession; |
50 | 52 | import org.neo4j.driver.reactive.RxResult;
|
| 53 | +import org.neo4j.driver.reactive.RxSession; |
51 | 54 | import org.neo4j.driver.util.StubServer;
|
52 | 55 |
|
53 | 56 | import static java.util.Arrays.asList;
|
|
72 | 75 | import static org.neo4j.driver.util.StubServer.INSECURE_CONFIG;
|
73 | 76 | import static org.neo4j.driver.util.StubServer.insecureBuilder;
|
74 | 77 | import static org.neo4j.driver.util.TestUtil.asOrderedSet;
|
| 78 | +import static org.neo4j.driver.util.TestUtil.await; |
75 | 79 |
|
76 | 80 | class DirectDriverBoltKitTest
|
77 | 81 | {
|
@@ -306,6 +310,75 @@ void shouldChangeFetchSize() throws Exception
|
306 | 310 | }
|
307 | 311 | }
|
308 | 312 |
|
| 313 | + @Test |
| 314 | + void shouldOnlyPullRecordsWhenNeededSimpleSession() throws Exception |
| 315 | + { |
| 316 | + StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 ); |
| 317 | + try |
| 318 | + { |
| 319 | + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) |
| 320 | + { |
| 321 | + Session session = driver.session( builder().withFetchSize( 2 ).build() ); |
| 322 | + Result result = session.run( "MATCH (n) RETURN n.name" ); |
| 323 | + ArrayList<String> resultList = new ArrayList<>(); |
| 324 | + result.forEachRemaining( ( rec ) -> resultList.add( rec.get( 0 ).asString() ) ); |
| 325 | + |
| 326 | + assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) ); |
| 327 | + } |
| 328 | + } |
| 329 | + finally |
| 330 | + { |
| 331 | + assertEquals( 0, server.exitStatus() ); |
| 332 | + } |
| 333 | + } |
| 334 | + |
| 335 | + @Test |
| 336 | + void shouldOnlyPullRecordsWhenNeededAsyncSession() throws Exception |
| 337 | + { |
| 338 | + StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 ); |
| 339 | + try |
| 340 | + { |
| 341 | + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) |
| 342 | + { |
| 343 | + AsyncSession session = driver.asyncSession( builder().withFetchSize( 2 ).build() ); |
| 344 | + |
| 345 | + ArrayList<String> resultList = new ArrayList<>(); |
| 346 | + |
| 347 | + await( session.runAsync( "MATCH (n) RETURN n.name" ) |
| 348 | + .thenCompose( resultCursor -> |
| 349 | + resultCursor.forEachAsync( record -> resultList.add( record.get( 0 ).asString() ) ) ) ); |
| 350 | + |
| 351 | + assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) ); |
| 352 | + } |
| 353 | + } |
| 354 | + finally |
| 355 | + { |
| 356 | + assertEquals( 0, server.exitStatus() ); |
| 357 | + } |
| 358 | + } |
| 359 | + |
| 360 | + @Test |
| 361 | + void shouldPullAllRecordsOnListAsyncWhenOverWatermark() throws Exception |
| 362 | + { |
| 363 | + StubServer server = StubServer.start( "streaming_records_v4_list_async.script", 9001 ); |
| 364 | + try |
| 365 | + { |
| 366 | + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) |
| 367 | + { |
| 368 | + AsyncSession session = driver.asyncSession( builder().withFetchSize( 10 ).build() ); |
| 369 | + |
| 370 | + ResultCursor cursor = await( session.runAsync( "MATCH (n) RETURN n.name" ) ); |
| 371 | + List<String> records = await( cursor.listAsync( record -> record.get( 0 ).asString() ) ); |
| 372 | + |
| 373 | + assertEquals( records, asList( "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L" ) ); |
| 374 | + } |
| 375 | + } |
| 376 | + finally |
| 377 | + { |
| 378 | + assertEquals( 0, server.exitStatus() ); |
| 379 | + } |
| 380 | + } |
| 381 | + |
309 | 382 | @Test
|
310 | 383 | void shouldAllowPullAll() throws Exception
|
311 | 384 | {
|
|
0 commit comments