Skip to content

Commit 88f0daa

Browse files
committed
Better handling of cores and read replicas
Also added cluster startup timeout.
1 parent 2a5e365 commit 88f0daa

File tree

4 files changed

+112
-25
lines changed

4 files changed

+112
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,21 @@ public static <T> List<T> asList( Iterable<T> it )
5252
return list;
5353
}
5454

55+
public static <T> T single( Iterable<T> it )
56+
{
57+
Iterator<T> iterator = it.iterator();
58+
if ( !iterator.hasNext() )
59+
{
60+
throw new IllegalArgumentException( "Given iterable is empty" );
61+
}
62+
T result = iterator.next();
63+
if ( iterator.hasNext() )
64+
{
65+
throw new IllegalArgumentException( "Given iterable contains more than one element: " + it );
66+
}
67+
return result;
68+
}
69+
5570
public static Map<String, String> map( String ... alternatingKeyValue )
5671
{
5772
Map<String, String> out = new HashMap<>();

driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Collections;
2424
import java.util.HashSet;
2525
import java.util.List;
26-
import java.util.Objects;
2726
import java.util.Set;
2827
import java.util.concurrent.ThreadLocalRandom;
2928
import java.util.concurrent.TimeUnit;
@@ -38,11 +37,13 @@
3837
import org.neo4j.driver.v1.Session;
3938
import org.neo4j.driver.v1.StatementResult;
4039

40+
import static org.neo4j.driver.internal.util.Iterables.single;
4141
import static org.neo4j.driver.v1.Config.TrustStrategy.trustAllCertificates;
4242

4343
public class Cluster
4444
{
4545
private static final String ADMIN_USER = "neo4j";
46+
private static final int STARTUP_TIMEOUT_SECONDS = 60;
4647

4748
private final Path path;
4849
private final String password;
@@ -53,15 +54,16 @@ public Cluster( Path path, String password )
5354
this( path, password, Collections.<ClusterMember>emptySet() );
5455
}
5556

56-
public Cluster( Path path, String password, Set<ClusterMember> members )
57+
private Cluster( Path path, String password, Set<ClusterMember> members )
5758
{
58-
this.path = Objects.requireNonNull( path );
59+
this.path = path;
5960
this.password = password;
60-
this.members = waitForMembers( password, members );
61+
this.members = members;
6162
}
6263

63-
Cluster withMembers( Set<ClusterMember> newMembers )
64+
Cluster withMembers( Set<ClusterMember> newMembers ) throws ClusterUnavailableException
6465
{
66+
waitForMembers( newMembers, password );
6567
return new Cluster( path, password, newMembers );
6668
}
6769

@@ -84,7 +86,6 @@ public void accept( Session session )
8486

8587
public ClusterMember leaderTx( Consumer<Session> tx )
8688
{
87-
// todo: handle leader switches
8889
ClusterMember leader = leader();
8990
try ( Driver driver = createDriver( leader.getBoltUri(), password );
9091
Session session = driver.session() )
@@ -125,18 +126,6 @@ public Set<ClusterMember> readReplicas()
125126
return membersWithRole( ClusterMemberRole.READ_REPLICA );
126127
}
127128

128-
private static Driver createDriver( String password, Set<ClusterMember> members )
129-
{
130-
if ( members.isEmpty() )
131-
{
132-
throw new IllegalArgumentException( "No members, can't create driver" );
133-
}
134-
135-
ClusterMember firstMember = members.iterator().next();
136-
URI boltUri = firstMember.getBoltUri();
137-
return createDriver( boltUri, password );
138-
}
139-
140129
@Override
141130
public String toString()
142131
{
@@ -150,7 +139,7 @@ private Set<ClusterMember> membersWithRole( ClusterMemberRole role )
150139
{
151140
Set<ClusterMember> membersWithRole = new HashSet<>();
152141

153-
try ( Driver driver = createDriver( password, members );
142+
try ( Driver driver = createDriver( members, password );
154143
Session session = driver.session( AccessMode.READ ) )
155144
{
156145
StatementResult result = session.run( "call dbms.cluster.overview()" );
@@ -177,20 +166,23 @@ private Set<ClusterMember> membersWithRole( ClusterMemberRole role )
177166
return membersWithRole;
178167
}
179168

180-
private static Set<ClusterMember> waitForMembers( String password, Set<ClusterMember> members )
169+
private static Set<ClusterMember> waitForMembers( Set<ClusterMember> members, String password )
170+
throws ClusterUnavailableException
181171
{
182172
if ( members.isEmpty() )
183173
{
184-
return members;
174+
throw new IllegalArgumentException( "No members to wait for" );
185175
}
186176

187177
Set<ClusterMember> offlineMembers = new HashSet<>( members );
178+
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis( STARTUP_TIMEOUT_SECONDS );
188179

189-
try ( Driver driver = createDriver( password, members ) )
180+
try ( Driver driver = createDriver( members, password ) )
190181
{
191-
// todo: add some timeout
192182
while ( !offlineMembers.isEmpty() )
193183
{
184+
assertDeadlineNotReached( deadline );
185+
194186
try ( Session session = driver.session( AccessMode.READ ) )
195187
{
196188
StatementResult result = session.run( "call dbms.cluster.overview()" );
@@ -211,11 +203,55 @@ private static Set<ClusterMember> waitForMembers( String password, Set<ClusterMe
211203
return members;
212204
}
213205

206+
private static Driver createDriver( Set<ClusterMember> members, String password )
207+
{
208+
if ( members.isEmpty() )
209+
{
210+
throw new IllegalArgumentException( "No members, can't create driver" );
211+
}
212+
213+
for ( ClusterMember member : members )
214+
{
215+
Driver driver = createDriver( member.getBoltUri(), password );
216+
try ( Session session = driver.session( AccessMode.READ ) )
217+
{
218+
if ( isCoreMember( session ) )
219+
{
220+
return driver;
221+
}
222+
}
223+
catch ( Exception e )
224+
{
225+
driver.close();
226+
throw e;
227+
}
228+
}
229+
230+
throw new IllegalStateException( "No core members found among: " + members );
231+
}
232+
214233
private static Driver createDriver( URI boltUri, String password )
215234
{
216235
return GraphDatabase.driver( boltUri, AuthTokens.basic( ADMIN_USER, password ), driverConfig() );
217236
}
218237

238+
private static boolean isCoreMember( Session session )
239+
{
240+
Record record = single( session.run( "call dbms.cluster.role" ).list() );
241+
String roleName = record.get( "role" ).asString();
242+
ClusterMemberRole role = ClusterMemberRole.valueOf( roleName.toUpperCase() );
243+
return role != ClusterMemberRole.READ_REPLICA;
244+
}
245+
246+
private static void assertDeadlineNotReached( long deadline ) throws ClusterUnavailableException
247+
{
248+
if ( System.currentTimeMillis() > deadline )
249+
{
250+
throw new ClusterUnavailableException(
251+
"Cluster did not become available in " + STARTUP_TIMEOUT_SECONDS + " seconds" );
252+
}
253+
}
254+
219255
private static URI extractBoltUri( Record record )
220256
{
221257
List<Object> addresses = record.get( "addresses" ).asList();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.v1.util.cc;
20+
21+
class ClusterUnavailableException extends Exception
22+
{
23+
ClusterUnavailableException( String message )
24+
{
25+
super( message );
26+
}
27+
}

driver/src/test/java/org/neo4j/driver/v1/util/cc/SharedCluster.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,21 @@ static void install( String neo4jVersion, int cores, int readReplicas, String pa
5252
clusterInstance = new Cluster( path, password );
5353
}
5454

55-
static void start()
55+
static void start() throws ClusterUnavailableException
5656
{
5757
assertClusterExists();
5858
String output = ClusterControl.startCluster( clusterInstance.getPath() );
5959
Set<ClusterMember> members = parseStartCommandOutput( output );
60-
clusterInstance = clusterInstance.withMembers( members );
60+
61+
try
62+
{
63+
clusterInstance = clusterInstance.withMembers( members );
64+
}
65+
catch ( ClusterUnavailableException e )
66+
{
67+
kill();
68+
throw e;
69+
}
6170
}
6271

6372
static void stop()

0 commit comments

Comments
 (0)