Skip to content

Commit 39d00db

Browse files
authored
Merge pull request #612 from zhenlineo/2.0-cluster
Update cluster tests for 4.0 cluster
2 parents d508e3c + b4dcd6b commit 39d00db

13 files changed

+242
-82
lines changed

driver/src/test/java/org/neo4j/driver/integration/ResultStreamIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.Value;
3535
import org.neo4j.driver.exceptions.ClientException;
3636
import org.neo4j.driver.exceptions.NoSuchRecordException;
37+
import org.neo4j.driver.internal.util.Neo4jFeature;
3738
import org.neo4j.driver.summary.ResultSummary;
3839
import org.neo4j.driver.util.ParallelizableIT;
3940
import org.neo4j.driver.util.SessionExtension;
@@ -281,6 +282,7 @@ void shouldConvertImmediatelyFailingStatementResultToStream()
281282
}
282283

283284
@Test
285+
@DisabledOnNeo4jWith( Neo4jFeature.NO_STREAMING )
284286
void shouldConvertEventuallyFailingStatementResultToStream()
285287
{
286288
List<Integer> seen = new ArrayList<>();

driver/src/test/java/org/neo4j/driver/integration/SessionIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;
9595
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;
9696
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
97+
import static org.neo4j.driver.internal.util.Neo4jFeature.NO_STREAMING;
9798
import static org.neo4j.driver.util.DaemonThreadFactory.daemon;
9899
import static org.neo4j.driver.util.Neo4jRunner.DEFAULT_AUTH_TOKEN;
99100

@@ -864,6 +865,7 @@ void shouldNotRetryOnConnectionAcquisitionTimeout()
864865
}
865866

866867
@Test
868+
@DisabledOnNeo4jWith( NO_STREAMING )
867869
void shouldAllowConsumingRecordsAfterFailureInSessionClose()
868870
{
869871
Session session = neo4j.driver().session();
@@ -1293,7 +1295,7 @@ void shouldErrorDatabaseWhenDatabaseIsAbsent() throws Throwable
12931295
result.consume();
12941296
} );
12951297

1296-
assertThat( error.getMessage(), containsString( "Database does not exists. Database name: 'foo'" ) );
1298+
assertThat( error.getMessage(), containsString( "Database does not exist. Database name: 'foo'" ) );
12971299
session.close();
12981300
}
12991301

@@ -1310,7 +1312,7 @@ void shouldErrorDatabaseNameUsingTxWhenDatabaseIsAbsent() throws Throwable
13101312
StatementResult result = transaction.run( "RETURN 1" );
13111313
result.consume();
13121314
});
1313-
assertThat( error.getMessage(), containsString( "Database does not exists. Database name: 'foo'" ) );
1315+
assertThat( error.getMessage(), containsString( "Database does not exist. Database name: 'foo'" ) );
13141316
session.close();
13151317
}
13161318

@@ -1325,7 +1327,7 @@ void shouldErrorDatabaseNameUsingTxWithRetriesWhenDatabaseIsAbsent() throws Thro
13251327
ClientException error = assertThrows( ClientException.class, () -> {
13261328
session.readTransaction( tx -> tx.run( "RETURN 1" ).consume() );
13271329
});
1328-
assertThat( error.getMessage(), containsString( "Database does not exists. Database name: 'foo'" ) );
1330+
assertThat( error.getMessage(), containsString( "Database does not exist. Database name: 'foo'" ) );
13291331
session.close();
13301332
}
13311333

driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import static org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter;
8181
import static org.neo4j.driver.internal.util.Matchers.syntaxError;
8282
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V3;
83+
import static org.neo4j.driver.internal.util.Neo4jFeature.NO_STREAMING;
8384
import static org.neo4j.driver.util.TestUtil.await;
8485
import static org.neo4j.driver.util.TestUtil.awaitAll;
8586

@@ -155,6 +156,7 @@ void shouldFailForIncorrectQuery()
155156
}
156157

157158
@Test
159+
@DisabledOnNeo4jWith( NO_STREAMING )
158160
void shouldFailWhenQueryFailsAtRuntime()
159161
{
160162
StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 0] AS x RETURN 10 / x" ) );

driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.neo4j.driver.Record;
2828
import org.neo4j.driver.exceptions.ClientException;
29+
import org.neo4j.driver.internal.util.DisabledOnNeo4jWith;
2930
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
3031
import org.neo4j.driver.reactive.RxStatementResult;
3132
import org.neo4j.driver.reactive.RxSession;
@@ -44,6 +45,7 @@
4445
import static org.junit.jupiter.api.Assertions.assertTrue;
4546
import static org.neo4j.driver.Values.parameters;
4647
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
48+
import static org.neo4j.driver.internal.util.Neo4jFeature.NO_STREAMING;
4749

4850
@EnabledOnNeo4jWith( BOLT_V4 )
4951
@ParallelizableIT
@@ -281,6 +283,7 @@ void shouldDiscardRecords()
281283
}
282284

283285
@Test
286+
@DisabledOnNeo4jWith( NO_STREAMING )
284287
void shouldStreamCorrectRecordsBackBeforeError()
285288
{
286289
RxSession session = neo4j.driver().rxSession();

driver/src/test/java/org/neo4j/driver/internal/util/Neo4jFeature.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.neo4j.driver.internal.util;
2020

2121
import static java.util.Objects.requireNonNull;
22-
import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0;
23-
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
2422
import static org.neo4j.driver.internal.util.ServerVersion.v3_4_0;
2523
import static org.neo4j.driver.internal.util.ServerVersion.v3_5_0;
2624
import static org.neo4j.driver.internal.util.ServerVersion.v4_0_0;
@@ -30,7 +28,8 @@ public enum Neo4jFeature
3028
SPATIAL_TYPES( v3_4_0 ),
3129
TEMPORAL_TYPES( v3_4_0 ),
3230
BOLT_V3( v3_5_0 ),
33-
BOLT_V4( v4_0_0 );
31+
BOLT_V4( v4_0_0 ),
32+
NO_STREAMING( v4_0_0 ); // the cypher cannot streaming records before error
3433

3534
private final ServerVersion availableFromVersion;
3635

driver/src/test/java/org/neo4j/driver/stress/CausalClusteringIT.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.ArrayList;
2828
import java.util.Arrays;
2929
import java.util.List;
30+
import java.util.Map;
3031
import java.util.Set;
3132
import java.util.concurrent.Callable;
3233
import java.util.concurrent.CompletionStage;
@@ -57,6 +58,7 @@
5758
import org.neo4j.driver.exceptions.ServiceUnavailableException;
5859
import org.neo4j.driver.exceptions.SessionExpiredException;
5960
import org.neo4j.driver.integration.NestedQueries;
61+
import org.neo4j.driver.internal.BoltServerAddress;
6062
import org.neo4j.driver.internal.cluster.RoutingSettings;
6163
import org.neo4j.driver.internal.retry.RetrySettings;
6264
import org.neo4j.driver.internal.util.DisabledOnNeo4jWith;
@@ -70,6 +72,8 @@
7072
import org.neo4j.driver.util.cc.ClusterExtension;
7173
import org.neo4j.driver.util.cc.ClusterMember;
7274
import org.neo4j.driver.util.cc.ClusterMemberRole;
75+
import org.neo4j.driver.util.cc.ClusterMemberRoleDiscoveryFactory;
76+
import org.neo4j.driver.util.cc.ClusterMemberRoleDiscoveryFactory.ClusterMemberRoleDiscovery;
7377

7478
import static java.util.concurrent.TimeUnit.MILLISECONDS;
7579
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -844,18 +848,20 @@ private Driver discoverDriver( List<URI> routingUris )
844848
return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), configWithoutLogging() );
845849
}
846850

847-
private ClusterOverview fetchClusterOverview( ClusterMember member )
851+
private static ClusterOverview fetchClusterOverview( ClusterMember member )
848852
{
849853
int leaderCount = 0;
850854
int followerCount = 0;
851855
int readReplicaCount = 0;
852856

853857
Driver driver = clusterRule.getCluster().getDirectDriver( member );
854-
try ( Session session = driver.session() )
858+
try
855859
{
856-
for ( Record record : session.run( "CALL dbms.cluster.overview()" ).list() )
860+
final ClusterMemberRoleDiscovery discovery = ClusterMemberRoleDiscoveryFactory.newInstance( ServerVersion.version( driver ) );
861+
final Map<BoltServerAddress,ClusterMemberRole> clusterOverview = discovery.findClusterOverview( driver );
862+
for ( BoltServerAddress address : clusterOverview.keySet() )
857863
{
858-
ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() );
864+
ClusterMemberRole role = clusterOverview.get( address );
859865
if ( role == ClusterMemberRole.LEADER )
860866
{
861867
leaderCount++;

driver/src/test/java/org/neo4j/driver/stress/CausalClusteringStressIT.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@
3434

3535
import org.neo4j.driver.AuthToken;
3636
import org.neo4j.driver.Driver;
37-
import org.neo4j.driver.Record;
38-
import org.neo4j.driver.Session;
3937
import org.neo4j.driver.exceptions.SessionExpiredException;
38+
import org.neo4j.driver.internal.BoltServerAddress;
39+
import org.neo4j.driver.internal.util.ServerVersion;
4040
import org.neo4j.driver.summary.ResultSummary;
4141
import org.neo4j.driver.util.cc.ClusterMemberRole;
42+
import org.neo4j.driver.util.cc.ClusterMemberRoleDiscoveryFactory;
4243
import org.neo4j.driver.util.cc.LocalOrRemoteClusterExtension;
4344

4445
import static org.hamcrest.Matchers.both;
4546
import static org.hamcrest.Matchers.greaterThan;
4647
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4748
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4849
import static org.hamcrest.junit.MatcherAssert.assertThat;
49-
import static org.neo4j.driver.util.cc.ClusterMember.SIMPLE_SCHEME;
5050

5151
class CausalClusteringStressIT extends AbstractStressTestBase<CausalClusteringStressIT.Context>
5252
{
@@ -133,23 +133,21 @@ private static ClusterAddresses fetchClusterAddresses( Driver driver )
133133
Set<String> followers = new HashSet<>();
134134
Set<String> readReplicas = new HashSet<>();
135135

136-
try ( Session session = driver.session() )
136+
final ClusterMemberRoleDiscoveryFactory.ClusterMemberRoleDiscovery discovery =
137+
ClusterMemberRoleDiscoveryFactory.newInstance( ServerVersion.version( driver ) );
138+
final Map<BoltServerAddress,ClusterMemberRole> clusterOverview = discovery.findClusterOverview( driver );
139+
140+
for ( BoltServerAddress address : clusterOverview.keySet() )
137141
{
138-
List<Record> records = session.run( "CALL dbms.cluster.overview()" ).list();
139-
for ( Record record : records )
142+
String boltAddress = String.format( "%s:%s", address.host(), address.port() );
143+
ClusterMemberRole role = clusterOverview.get( address );
144+
if ( role == ClusterMemberRole.FOLLOWER )
140145
{
141-
List<Object> addresses = record.get( "addresses" ).asList();
142-
String boltAddress = ((String) addresses.get( 0 )).replace( SIMPLE_SCHEME, "" );
143-
144-
ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() );
145-
if ( role == ClusterMemberRole.FOLLOWER )
146-
{
147-
followers.add( boltAddress );
148-
}
149-
else if ( role == ClusterMemberRole.READ_REPLICA )
150-
{
151-
readReplicas.add( boltAddress );
152-
}
146+
followers.add( boltAddress );
147+
}
148+
else if ( role == ClusterMemberRole.READ_REPLICA )
149+
{
150+
readReplicas.add( boltAddress );
153151
}
154152
}
155153

driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public RxWriteQuery( AbstractStressTestBase<C> stressTest, Driver driver, boolea
4444
public CompletionStage<Void> execute( C context )
4545
{
4646
CompletableFuture<Void> queryFinished = new CompletableFuture<>();
47-
Flux.using( () -> newSession( AccessMode.READ, context ),
47+
Flux.using( () -> newSession( AccessMode.WRITE, context ),
4848
session -> session.run( "CREATE ()" ).summary(), RxSession::close )
4949
.subscribe( summary -> {
5050
queryFinished.complete( null );

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public RxWriteQueryInTx( AbstractStressTestBase<C> stressTest, Driver driver, bo
4545
public CompletionStage<Void> execute( C context )
4646
{
4747
CompletableFuture<Void> queryFinished = new CompletableFuture<>();
48-
RxSession session = newSession( AccessMode.READ, context );
48+
RxSession session = newSession( AccessMode.WRITE, context );
4949
Flux.usingWhen( session.beginTransaction(), tx -> tx.run( "CREATE ()" ).summary(), RxTransaction::commit, RxTransaction::rollback ).subscribe(
5050
summary -> {
5151
assertEquals( 1, summary.counters().nodesCreated() );

driver/src/test/java/org/neo4j/driver/util/cc/Cluster.java

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,19 @@
2323
import java.nio.file.Path;
2424
import java.util.HashSet;
2525
import java.util.List;
26+
import java.util.Map;
2627
import java.util.Set;
2728
import java.util.concurrent.ThreadLocalRandom;
2829
import java.util.concurrent.TimeUnit;
2930

30-
import org.neo4j.driver.internal.BoltServerAddress;
31-
import org.neo4j.driver.AccessMode;
3231
import org.neo4j.driver.Driver;
3332
import org.neo4j.driver.Record;
34-
import org.neo4j.driver.Session;
35-
import org.neo4j.driver.StatementResult;
33+
import org.neo4j.driver.internal.BoltServerAddress;
3634
import org.neo4j.driver.util.TestUtil;
35+
import org.neo4j.driver.util.cc.ClusterMemberRoleDiscoveryFactory.ClusterMemberRoleDiscovery;
3736

3837
import static java.util.Collections.emptySet;
3938
import static java.util.Collections.unmodifiableSet;
40-
import static org.neo4j.driver.internal.SessionConfig.builder;
41-
import static org.neo4j.driver.internal.util.Iterables.single;
4239
import static org.neo4j.driver.util.TestUtil.sleep;
4340

4441
public class Cluster implements AutoCloseable
@@ -221,21 +218,18 @@ private Set<ClusterMember> membersWithRole( ClusterMemberRole role )
221218
Set<ClusterMember> membersWithRole = new HashSet<>();
222219

223220
Driver driver = driverToAnyCore( members, clusterDrivers );
224-
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
221+
final ClusterMemberRoleDiscovery discovery = clusterDrivers.getDiscovery();
222+
final Map<BoltServerAddress,ClusterMemberRole> clusterOverview = discovery.findClusterOverview( driver );
223+
for ( BoltServerAddress boltAddress : clusterOverview.keySet() )
225224
{
226-
List<Record> records = findClusterOverview( session );
227-
for ( Record record : records )
225+
if ( role == clusterOverview.get( boltAddress ) )
228226
{
229-
if ( role == extractRole( record ) )
227+
ClusterMember member = findByBoltAddress( boltAddress, members );
228+
if ( member == null )
230229
{
231-
BoltServerAddress boltAddress = extractBoltAddress( record );
232-
ClusterMember member = findByBoltAddress( boltAddress, members );
233-
if ( member == null )
234-
{
235-
throw new IllegalStateException( "Unknown cluster member: '" + boltAddress + "'\n" + this );
236-
}
237-
membersWithRole.add( member );
230+
throw new IllegalStateException( "Unknown cluster member: '" + boltAddress + "'\n" + this );
238231
}
232+
membersWithRole.add( member );
239233
}
240234
}
241235

@@ -279,10 +273,15 @@ private static void waitForMembersToBeOnline( Set<ClusterMember> members, Cluste
279273
assertDeadlineNotReached( deadline, expectedOnlineAddresses, actualOnlineAddresses, error );
280274

281275
Driver driver = driverToAnyCore( members, clusterDrivers );
282-
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
276+
final ClusterMemberRoleDiscovery discovery = clusterDrivers.getDiscovery();
277+
try
283278
{
284-
List<Record> records = findClusterOverview( session );
285-
actualOnlineAddresses = extractBoltAddresses( records );
279+
final Map<BoltServerAddress,ClusterMemberRole> clusterOverview = discovery.findClusterOverview( driver );
280+
// we will wait until the leader is online
281+
if ( clusterOverview.containsValue( ClusterMemberRole.LEADER ) )
282+
{
283+
actualOnlineAddresses = clusterOverview.keySet();
284+
}
286285
}
287286
catch ( Throwable t )
288287
{
@@ -310,31 +309,16 @@ private static Driver driverToAnyCore( Set<ClusterMember> members, ClusterDriver
310309
for ( ClusterMember member : members )
311310
{
312311
Driver driver = clusterDrivers.getDriver( member );
313-
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
312+
final ClusterMemberRoleDiscovery discovery = clusterDrivers.getDiscovery();
313+
if ( discovery.isCoreMember( driver ) )
314314
{
315-
if ( isCoreMember( session ) )
316-
{
317-
return driver;
318-
}
315+
return driver;
319316
}
320317
}
321318

322319
throw new IllegalStateException( "No core members found among: " + members );
323320
}
324321

325-
private static List<Record> findClusterOverview( Session session )
326-
{
327-
StatementResult result = session.run( "CALL dbms.cluster.overview()" );
328-
return result.list();
329-
}
330-
331-
private static boolean isCoreMember( Session session )
332-
{
333-
Record record = single( session.run( "call dbms.cluster.role()" ).list() );
334-
ClusterMemberRole role = extractRole( record );
335-
return role != ClusterMemberRole.READ_REPLICA;
336-
}
337-
338322
private static void assertDeadlineNotReached( long deadline, Set<?> expectedAddresses, Set<?> actualAddresses,
339323
Throwable error ) throws ClusterUnavailableException
340324
{
@@ -367,17 +351,6 @@ private static Set<BoltServerAddress> extractBoltAddresses( Set<ClusterMember> m
367351
return addresses;
368352
}
369353

370-
private static Set<BoltServerAddress> extractBoltAddresses( List<Record> records )
371-
{
372-
Set<BoltServerAddress> addresses = new HashSet<>();
373-
for ( Record record : records )
374-
{
375-
BoltServerAddress boltAddress = extractBoltAddress( record );
376-
addresses.add( boltAddress );
377-
}
378-
return addresses;
379-
}
380-
381354
private static BoltServerAddress extractBoltAddress( Record record )
382355
{
383356
List<Object> addresses = record.get( "addresses" ).asList();

0 commit comments

Comments
 (0)