|
28 | 28 | import java.util.concurrent.CountDownLatch;
|
29 | 29 | import java.util.concurrent.ExecutorService;
|
30 | 30 | import java.util.concurrent.Executors;
|
| 31 | +import java.util.concurrent.Future; |
31 | 32 | import java.util.concurrent.TimeUnit;
|
32 | 33 | import java.util.concurrent.TimeoutException;
|
33 | 34 |
|
|
59 | 60 | import org.neo4j.driver.v1.util.cc.ClusterMemberRole;
|
60 | 61 | import org.neo4j.driver.v1.util.cc.ClusterRule;
|
61 | 62 |
|
| 63 | +import static java.util.concurrent.TimeUnit.SECONDS; |
62 | 64 | import static org.hamcrest.Matchers.containsString;
|
63 | 65 | import static org.hamcrest.Matchers.instanceOf;
|
64 | 66 | import static org.hamcrest.Matchers.startsWith;
|
@@ -233,7 +235,7 @@ public void shouldDropBrokenOldSessions() throws Exception
|
233 | 235 |
|
234 | 236 | URI routingUri = cluster.leader().getRoutingUri();
|
235 | 237 | AuthToken auth = clusterRule.getDefaultAuthToken();
|
236 |
| - RoutingSettings routingSettings = new RoutingSettings( 1, TimeUnit.SECONDS.toMillis( 5 ), null ); |
| 238 | + RoutingSettings routingSettings = new RoutingSettings( 1, SECONDS.toMillis( 5 ), null ); |
237 | 239 | RetrySettings retrySettings = RetrySettings.DEFAULT;
|
238 | 240 |
|
239 | 241 | try ( Driver driver = driverFactory.newInstance( routingUri, auth, routingSettings, retrySettings, config ) )
|
@@ -450,6 +452,43 @@ public Integer execute( Transaction tx )
|
450 | 452 | }
|
451 | 453 | }
|
452 | 454 |
|
| 455 | + @Test |
| 456 | + public void shouldAcceptMultipleBookmarks() throws Exception |
| 457 | + { |
| 458 | + int threadCount = 5; |
| 459 | + String label = "Person"; |
| 460 | + String property = "name"; |
| 461 | + String value = "Alice"; |
| 462 | + |
| 463 | + Cluster cluster = clusterRule.getCluster(); |
| 464 | + ClusterMember leader = cluster.leader(); |
| 465 | + ExecutorService executor = Executors.newCachedThreadPool(); |
| 466 | + |
| 467 | + try ( Driver driver = createDriver( leader.getRoutingUri() ) ) |
| 468 | + { |
| 469 | + List<Future<String>> futures = new ArrayList<>(); |
| 470 | + for ( int i = 0; i < threadCount; i++ ) |
| 471 | + { |
| 472 | + futures.add( executor.submit( createNodeAndGetBookmark( driver, label, property, value ) ) ); |
| 473 | + } |
| 474 | + |
| 475 | + List<String> bookmarks = new ArrayList<>(); |
| 476 | + for ( Future<String> future : futures ) |
| 477 | + { |
| 478 | + bookmarks.add( future.get( 10, SECONDS ) ); |
| 479 | + } |
| 480 | + |
| 481 | + executor.shutdown(); |
| 482 | + assertTrue( executor.awaitTermination( 5, SECONDS ) ); |
| 483 | + |
| 484 | + try ( Session session = driver.session( AccessMode.READ, bookmarks ) ) |
| 485 | + { |
| 486 | + int count = countNodes( session, label, property, value ); |
| 487 | + assertEquals( count, threadCount ); |
| 488 | + } |
| 489 | + } |
| 490 | + } |
| 491 | + |
453 | 492 | private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException
|
454 | 493 | {
|
455 | 494 | try ( Driver driver = createDriver( member.getRoutingUri() ) )
|
@@ -571,7 +610,7 @@ private void awaitLeaderToStepDown( Driver driver )
|
571 | 610 | int newLeadersCount = 0;
|
572 | 611 | int newFollowersCount = 0;
|
573 | 612 | int newReadReplicasCount = 0;
|
574 |
| - for ( Record record : session.run( "CALL dbms.cluster.overview" ).list() ) |
| 613 | + for ( Record record : session.run( "CALL dbms.cluster.overview()" ).list() ) |
575 | 614 | {
|
576 | 615 | ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() );
|
577 | 616 | if ( role == ClusterMemberRole.LEADER )
|
@@ -669,4 +708,44 @@ private static void closeAndExpectException( AutoCloseable closeable, Class<? ex
|
669 | 708 | assertThat( e, instanceOf( exceptionClass ) );
|
670 | 709 | }
|
671 | 710 | }
|
| 711 | + |
| 712 | + private static int countNodes( Session session, final String label, final String property, final String value ) |
| 713 | + { |
| 714 | + return session.readTransaction( new TransactionWork<Integer>() |
| 715 | + { |
| 716 | + @Override |
| 717 | + public Integer execute( Transaction tx ) |
| 718 | + { |
| 719 | + StatementResult result = tx.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN count(n)", |
| 720 | + parameters( "value", value ) ); |
| 721 | + return result.single().get( 0 ).asInt(); |
| 722 | + } |
| 723 | + } ); |
| 724 | + } |
| 725 | + |
| 726 | + private static Callable<String> createNodeAndGetBookmark( final Driver driver, final String label, |
| 727 | + final String property, final String value ) |
| 728 | + { |
| 729 | + return new Callable<String>() |
| 730 | + { |
| 731 | + @Override |
| 732 | + public String call() |
| 733 | + { |
| 734 | + try ( Session session = driver.session() ) |
| 735 | + { |
| 736 | + session.writeTransaction( new TransactionWork<Void>() |
| 737 | + { |
| 738 | + @Override |
| 739 | + public Void execute( Transaction tx ) |
| 740 | + { |
| 741 | + tx.run( "CREATE (n:" + label + ") SET n." + property + " = $value", |
| 742 | + parameters( "value", value ) ); |
| 743 | + return null; |
| 744 | + } |
| 745 | + } ); |
| 746 | + return session.lastBookmark(); |
| 747 | + } |
| 748 | + } |
| 749 | + }; |
| 750 | + } |
672 | 751 | }
|
0 commit comments