Skip to content

Commit a197ccd

Browse files
committed
Updated handling of getServers response
`getServers` now only returns a single row containing all the information, together with a `ttl` that is no longer per server but a global `ttl` for the returned servers.
1 parent 9263030 commit a197ccd

File tree

8 files changed

+128
-129
lines changed

8 files changed

+128
-129
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
*
4+
* <p>
55
* This file is part of Neo4j.
6-
*
6+
* <p>
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
10+
* <p>
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
* <p>
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@
2020

2121
import java.util.Collections;
2222
import java.util.Comparator;
23+
import java.util.List;
2324
import java.util.Set;
2425

2526
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -34,10 +35,12 @@
3435
import org.neo4j.driver.v1.Record;
3536
import org.neo4j.driver.v1.Session;
3637
import org.neo4j.driver.v1.StatementResult;
38+
import org.neo4j.driver.v1.Value;
3739
import org.neo4j.driver.v1.exceptions.ClientException;
3840
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3941
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4042
import org.neo4j.driver.v1.util.BiFunction;
43+
import org.neo4j.driver.v1.util.Function;
4144

4245
import static java.lang.String.format;
4346

@@ -50,7 +53,7 @@ public class ClusterDriver extends BaseDriver
5053
public int compare( BoltServerAddress o1, BoltServerAddress o2 )
5154
{
5255
int compare = o1.host().compareTo( o2.host() );
53-
if (compare == 0)
56+
if ( compare == 0 )
5457
{
5558
compare = Integer.compare( o1.port(), o2.port() );
5659
}
@@ -60,16 +63,17 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
6063
};
6164
private static final int MIN_SERVERS = 2;
6265
private final ConnectionPool connections;
63-
private final BiFunction<Connection,Logger, Session> sessionProvider;
66+
private final BiFunction<Connection,Logger,Session> sessionProvider;
6467

65-
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers = new ConcurrentRoundRobinSet<>(COMPARATOR);
66-
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers = new ConcurrentRoundRobinSet<>(COMPARATOR);
67-
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers = new ConcurrentRoundRobinSet<>(COMPARATOR);
68+
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers =
69+
new ConcurrentRoundRobinSet<>( COMPARATOR );
70+
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
71+
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
6872

6973
public ClusterDriver( BoltServerAddress seedAddress,
7074
ConnectionPool connections,
7175
SecurityPlan securityPlan,
72-
BiFunction<Connection,Logger, Session> sessionProvider,
76+
BiFunction<Connection,Logger,Session> sessionProvider,
7377
Logging logging )
7478
{
7579
super( securityPlan, logging );
@@ -85,7 +89,7 @@ private void checkServers()
8589
{
8690
if ( routingServers.size() < MIN_SERVERS ||
8791
readServers.isEmpty() ||
88-
writeServers.isEmpty())
92+
writeServers.isEmpty() )
8993
{
9094
getServers();
9195
}
@@ -107,18 +111,22 @@ private void getServers()
107111
@Override
108112
public void accept( Record record )
109113
{
110-
BoltServerAddress newAddress = new BoltServerAddress( record.get( "address" ).asString() );
111-
switch ( record.get( "mode" ).asString().toUpperCase() )
114+
long ttl = record.get( "ttl" ).asLong();
115+
List<ServerInfo> servers = servers( record );
116+
for ( ServerInfo server : servers )
112117
{
113-
case "READ":
114-
readServers.add( newAddress );
115-
break;
116-
case "WRITE":
117-
writeServers.add( newAddress );
118-
break;
119-
case "ROUTE":
120-
routingServers.add( newAddress );
121-
break;
118+
switch ( server.role() )
119+
{
120+
case "READ":
121+
readServers.addAll( server.addresses() );
122+
break;
123+
case "WRITE":
124+
writeServers.addAll( server.addresses() );
125+
break;
126+
case "ROUTE":
127+
routingServers.addAll( server.addresses() );
128+
break;
129+
}
122130
}
123131
}
124132
} );
@@ -137,7 +145,7 @@ public void accept( Record record )
137145
this.close();
138146
throw new ServiceUnavailableException(
139147
String.format( "Server %s couldn't perform discovery",
140-
address == null ? "`UNKNOWN`" : address.toString()), ex );
148+
address == null ? "`UNKNOWN`" : address.toString() ), ex );
141149
}
142150
else
143151
{
@@ -146,14 +154,55 @@ public void accept( Record record )
146154
}
147155
}
148156

157+
private static class ServerInfo
158+
{
159+
private final List<BoltServerAddress> addresses;
160+
private final String role;
161+
162+
public ServerInfo( List<BoltServerAddress> addresses, String role )
163+
{
164+
this.addresses = addresses;
165+
this.role = role;
166+
}
167+
168+
public String role()
169+
{
170+
return role;
171+
}
172+
173+
List<BoltServerAddress> addresses()
174+
{
175+
return addresses;
176+
}
177+
}
178+
179+
private List<ServerInfo> servers( Record record )
180+
{
181+
return record.get( "servers" ).asList( new Function<Value,ServerInfo>()
182+
{
183+
@Override
184+
public ServerInfo apply( Value value )
185+
{
186+
return new ServerInfo( value.get("addresses").asList( new Function<Value,BoltServerAddress>()
187+
{
188+
@Override
189+
public BoltServerAddress apply( Value value )
190+
{
191+
return new BoltServerAddress( value.asString() );
192+
}
193+
} ), value.get("role").asString() );
194+
}
195+
} );
196+
}
197+
149198
//must be called from a synchronized method
150199
private boolean call( BoltServerAddress address, String procedureName, Consumer<Record> recorder )
151200
{
152201
Connection acquire = null;
153202
Session session = null;
154203
try
155204
{
156-
acquire = connections.acquire(address);
205+
acquire = connections.acquire( address );
157206
session = sessionProvider.apply( acquire, log );
158207

159208
StatementResult records = session.run( format( "CALL %s", procedureName ) );
@@ -255,13 +304,13 @@ Set<BoltServerAddress> routingServers()
255304
//For testing
256305
Set<BoltServerAddress> readServers()
257306
{
258-
return Collections.unmodifiableSet(readServers);
307+
return Collections.unmodifiableSet( readServers );
259308
}
260309

261310
//For testing
262311
Set<BoltServerAddress> writeServers()
263312
{
264-
return Collections.unmodifiableSet( writeServers);
313+
return Collections.unmodifiableSet( writeServers );
265314
}
266315

267316
//For testing

driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java

Lines changed: 40 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
*
4+
* <p>
55
* This file is part of Neo4j.
6-
*
6+
* <p>
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
10+
* <p>
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
* <p>
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,14 +24,13 @@
2424
import org.junit.rules.ExpectedException;
2525

2626
import java.util.Collections;
27-
import java.util.Iterator;
27+
import java.util.HashMap;
2828
import java.util.List;
29+
import java.util.Map;
2930

3031
import org.neo4j.driver.internal.net.BoltServerAddress;
3132
import org.neo4j.driver.internal.spi.Connection;
3233
import org.neo4j.driver.internal.spi.ConnectionPool;
33-
import org.neo4j.driver.internal.value.IntegerValue;
34-
import org.neo4j.driver.internal.value.StringValue;
3534
import org.neo4j.driver.v1.AccessMode;
3635
import org.neo4j.driver.v1.Logger;
3736
import org.neo4j.driver.v1.Logging;
@@ -56,6 +55,7 @@
5655
import static org.mockito.Mockito.mock;
5756
import static org.mockito.Mockito.when;
5857
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
58+
import static org.neo4j.driver.v1.Values.value;
5959

6060
public class ClusterDriverTest
6161
{
@@ -64,17 +64,17 @@ public class ClusterDriverTest
6464

6565
private static final BoltServerAddress SEED = new BoltServerAddress( "localhost", 7687 );
6666
private static final String GET_SERVERS = "CALL dbms.cluster.routing.getServers";
67-
private static final List<BoltServerAddress> NO_ADDRESSES = Collections.<BoltServerAddress>emptyList();
67+
private static final List<String> NO_ADDRESSES = Collections.emptyList();
6868

6969
@Test
7070
public void shouldDoRoutingOnInitialization()
7171
{
7272
// Given
7373
final Session session = mock( Session.class );
7474
when( session.run( GET_SERVERS ) ).thenReturn(
75-
getServers( singletonList( boltAddress( "localhost", 1111 ) ),
76-
singletonList( boltAddress( "localhost", 2222 ) ),
77-
singletonList( boltAddress( "localhost", 3333 ) ) ) );
75+
getServers( singletonList( "localhost:1111" ),
76+
singletonList( "localhost:2222" ),
77+
singletonList( "localhost:3333" ) ) );
7878

7979
// When
8080
ClusterDriver clusterDriver = forSession( session );
@@ -96,11 +96,11 @@ public void shouldDoReRoutingOnSessionAcquisitionIfNecessary()
9696
final Session session = mock( Session.class );
9797
when( session.run( GET_SERVERS ) )
9898
.thenReturn(
99-
getServers( singletonList( boltAddress( "localhost", 1111 ) ), NO_ADDRESSES, NO_ADDRESSES ) )
99+
getServers( singletonList( "localhost:1111" ), NO_ADDRESSES, NO_ADDRESSES ) )
100100
.thenReturn(
101-
getServers( singletonList( boltAddress( "localhost", 1112 ) ),
102-
singletonList( boltAddress( "localhost", 2222 ) ),
103-
singletonList( boltAddress( "localhost", 3333 ) ) ) );
101+
getServers( singletonList( "localhost:1112" ),
102+
singletonList( "localhost:2222" ),
103+
singletonList( "localhost:3333" ) ) );
104104

105105
ClusterDriver clusterDriver = forSession( session );
106106

@@ -129,12 +129,11 @@ public void shouldNotDoReRoutingOnSessionAcquisitionIfNotNecessary()
129129
final Session session = mock( Session.class );
130130
when( session.run( GET_SERVERS ) )
131131
.thenReturn(
132-
getServers( asList( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ),
133-
boltAddress( "localhost", 1113 ) ),
134-
singletonList( boltAddress( "localhost", 2222 ) ),
135-
singletonList( boltAddress( "localhost", 3333 ) ) ) )
132+
getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ),
133+
singletonList( "localhost:2222" ),
134+
singletonList( "localhost:3333" ) ) )
136135
.thenReturn(
137-
getServers( singletonList( boltAddress( "localhost", 5555 ) ), NO_ADDRESSES, NO_ADDRESSES ) );
136+
getServers( singletonList( "localhost:5555" ), NO_ADDRESSES, NO_ADDRESSES ) );
138137

139138
ClusterDriver clusterDriver = forSession( session );
140139

@@ -180,59 +179,34 @@ private BoltServerAddress boltAddress( String host, int port )
180179
return new BoltServerAddress( host, port );
181180
}
182181

183-
StatementResult getServers( final List<BoltServerAddress> routers, final List<BoltServerAddress> readers,
184-
final List<BoltServerAddress> writers )
182+
StatementResult getServers( final List<String> routers, final List<String> readers,
183+
final List<String> writers )
185184
{
186-
187-
188185
return new StatementResult()
189186
{
190-
private final int totalSize = routers.size() + readers.size() + writers.size();
191-
private final Iterator<BoltServerAddress> routeIterator = routers.iterator();
192-
private final Iterator<BoltServerAddress> readIterator = readers.iterator();
193-
private final Iterator<BoltServerAddress> writeIterator = writers.iterator();
194187
private int counter = 0;
195188

196189
@Override
197190
public List<String> keys()
198191
{
199-
return asList( "address", "mode", "expires" );
192+
return asList( "ttl", "servers" );
200193
}
201194

202195
@Override
203196
public boolean hasNext()
204197
{
205-
return counter++ < totalSize;
198+
return counter++ < 1;
206199
}
207200

208201
@Override
209202
public Record next()
210203
{
211-
if ( routeIterator.hasNext() )
212-
{
213-
return new InternalRecord( asList( "address", "mode", "expires" ),
214-
new Value[]{new StringValue( routeIterator.next().toString() ),
215-
new StringValue( "ROUTE" ),
216-
new IntegerValue( Long.MAX_VALUE )} );
217-
}
218-
else if ( readIterator.hasNext() )
219-
{
220-
return new InternalRecord( asList( "address", "mode", "expires" ),
221-
new Value[]{new StringValue( readIterator.next().toString() ),
222-
new StringValue( "READ" ),
223-
new IntegerValue( Long.MAX_VALUE )} );
224-
}
225-
else if ( writeIterator.hasNext() )
226-
{
227-
return new InternalRecord( asList( "address", "mode", "expires" ),
228-
new Value[]{new StringValue( writeIterator.next().toString() ),
229-
new StringValue( "WRITE" ),
230-
new IntegerValue( Long.MAX_VALUE )} );
231-
}
232-
else
233-
{
234-
return Collections.<Record>emptyIterator().next();
235-
}
204+
return new InternalRecord( asList( "ttl", "servers" ),
205+
new Value[]{
206+
value( Long.MAX_VALUE ),
207+
value( asList( serverInfo( "ROUTE", routers ), serverInfo( "WRITE", writers ),
208+
serverInfo( "READ", readers ) ) )
209+
} );
236210
}
237211

238212
@Override
@@ -268,11 +242,20 @@ public ResultSummary consume()
268242
@Override
269243
public void remove()
270244
{
271-
throw new UnsupportedOperationException( );
245+
throw new UnsupportedOperationException();
272246
}
273247
};
274248
}
275249

250+
private Map<String,Object> serverInfo( String role, List<String> addresses )
251+
{
252+
Map<String,Object> map = new HashMap<>();
253+
map.put( "role", role );
254+
map.put( "addresses", addresses );
255+
256+
return map;
257+
}
258+
276259
private ConnectionPool pool()
277260
{
278261
ConnectionPool pool = mock( ConnectionPool.class );

0 commit comments

Comments
 (0)