Skip to content

Commit 740db84

Browse files
committed
Added ttl handling
The returned `ttl` from server is now respected. Whenever we do acquisition we check if the information is stale, if so we do new `getServers` call.
1 parent d8b6c43 commit 740db84

File tree

4 files changed

+110
-24
lines changed

4 files changed

+110
-24
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
* <p>
4+
*
55
* This file is part of Neo4j.
6-
* <p>
6+
*
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
* <p>
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
* <p>
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,11 +23,13 @@
2323
import java.util.HashSet;
2424
import java.util.List;
2525
import java.util.Set;
26+
import java.util.concurrent.atomic.AtomicLong;
2627

2728
import org.neo4j.driver.internal.net.BoltServerAddress;
2829
import org.neo4j.driver.internal.security.SecurityPlan;
2930
import org.neo4j.driver.internal.spi.Connection;
3031
import org.neo4j.driver.internal.spi.ConnectionPool;
32+
import org.neo4j.driver.internal.util.Clock;
3133
import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet;
3234
import org.neo4j.driver.internal.util.Consumer;
3335
import org.neo4j.driver.v1.AccessRole;
@@ -48,6 +50,7 @@
4850
public class ClusterDriver extends BaseDriver
4951
{
5052
private static final String GET_SERVERS = "dbms.cluster.routing.getServers";
53+
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
5154
private final static Comparator<BoltServerAddress> COMPARATOR = new Comparator<BoltServerAddress>()
5255
{
5356
@Override
@@ -65,30 +68,34 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
6568
private static final int MIN_SERVERS = 2;
6669
private final ConnectionPool connections;
6770
private final BiFunction<Connection,Logger,Session> sessionProvider;
68-
71+
private final Clock clock;
6972
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers =
7073
new ConcurrentRoundRobinSet<>( COMPARATOR );
7174
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
7275
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
76+
private final AtomicLong expires = new AtomicLong( 0L );
7377

7478
public ClusterDriver( BoltServerAddress seedAddress,
7579
ConnectionPool connections,
7680
SecurityPlan securityPlan,
7781
BiFunction<Connection,Logger,Session> sessionProvider,
82+
Clock clock,
7883
Logging logging )
7984
{
8085
super( securityPlan, logging );
8186
routingServers.add( seedAddress );
8287
this.connections = connections;
8388
this.sessionProvider = sessionProvider;
89+
this.clock = clock;
8490
checkServers();
8591
}
8692

8793
private void checkServers()
8894
{
8995
synchronized ( routingServers )
9096
{
91-
if ( routingServers.size() < MIN_SERVERS ||
97+
if ( expires.get() < clock.millis() ||
98+
routingServers.size() < MIN_SERVERS ||
9299
readServers.isEmpty() ||
93100
writeServers.isEmpty() )
94101
{
@@ -99,7 +106,7 @@ private void checkServers()
99106

100107
private Set<BoltServerAddress> forgetAllServers()
101108
{
102-
final Set<BoltServerAddress> seen = new HashSet<>( );
109+
final Set<BoltServerAddress> seen = new HashSet<>();
103110
seen.addAll( routingServers );
104111
seen.addAll( readServers );
105112
seen.addAll( writeServers );
@@ -109,6 +116,20 @@ private Set<BoltServerAddress> forgetAllServers()
109116
return seen;
110117
}
111118

119+
private long calculateNewExpiry( Record record )
120+
{
121+
long ttl = record.get( "ttl" ).asLong();
122+
long nextExpiry = clock.millis() + 1000L * ttl;
123+
if ( ttl < 0 || ttl >= MAX_TTL || nextExpiry < 0 )
124+
{
125+
return Long.MAX_VALUE;
126+
}
127+
else
128+
{
129+
return nextExpiry;
130+
}
131+
}
132+
112133
//must be called from a synchronized block
113134
private void getServers()
114135
{
@@ -127,7 +148,7 @@ private void getServers()
127148
@Override
128149
public void accept( Record record )
129150
{
130-
long ttl = record.get( "ttl" ).asLong();
151+
expires.set( calculateNewExpiry( record ) );
131152
List<ServerInfo> servers = servers( record );
132153
for ( ServerInfo server : servers )
133154
{
@@ -206,14 +227,14 @@ private List<ServerInfo> servers( Record record )
206227
@Override
207228
public ServerInfo apply( Value value )
208229
{
209-
return new ServerInfo( value.get("addresses").asList( new Function<Value,BoltServerAddress>()
230+
return new ServerInfo( value.get( "addresses" ).asList( new Function<Value,BoltServerAddress>()
210231
{
211232
@Override
212233
public BoltServerAddress apply( Value value )
213234
{
214235
return new BoltServerAddress( value.asString() );
215236
}
216-
} ), value.get("role").asString() );
237+
} ), value.get( "role" ).asString() );
217238
}
218239
} );
219240
}

driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.neo4j.driver.internal.security.SecurityPlan;
3333
import org.neo4j.driver.internal.spi.Connection;
3434
import org.neo4j.driver.internal.spi.ConnectionPool;
35+
import org.neo4j.driver.internal.util.Clock;
3536
import org.neo4j.driver.v1.exceptions.ClientException;
3637
import org.neo4j.driver.v1.util.BiFunction;
3738

@@ -189,7 +190,7 @@ public static Driver driver( URI uri, AuthToken authToken, Config config )
189190
case "bolt":
190191
return new DirectDriver( address, connectionPool, securityPlan, config.logging() );
191192
case "bolt+routing":
192-
return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, config.logging() );
193+
return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, Clock.SYSTEM, config.logging() );
193194
default:
194195
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
195196
}

driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
* <p>
4+
*
55
* This file is part of Neo4j.
6-
* <p>
6+
*
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
* <p>
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
* <p>
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
* <p>
4+
*
55
* This file is part of Neo4j.
6-
* <p>
6+
*
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
* <p>
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
* <p>
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.internal.net.BoltServerAddress;
3232
import org.neo4j.driver.internal.spi.Connection;
3333
import org.neo4j.driver.internal.spi.ConnectionPool;
34+
import org.neo4j.driver.internal.util.Clock;
3435
import org.neo4j.driver.v1.AccessRole;
3536
import org.neo4j.driver.v1.Logger;
3637
import org.neo4j.driver.v1.Logging;
@@ -191,7 +192,63 @@ public void shouldForgetAboutServersOnRerouting()
191192
verify( pool ).purge( boltAddress( "localhost", 1111 ) );
192193
}
193194

195+
@Test
196+
public void shouldRediscoverOnTimeout()
197+
{
198+
// Given
199+
final Session session = mock( Session.class );
200+
Clock clock = mock( Clock.class );
201+
when(clock.millis()).thenReturn( 0L, 11000L, 22000L );
202+
when( session.run( GET_SERVERS ) )
203+
.thenReturn(
204+
getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ),
205+
singletonList( "localhost:2222" ),
206+
singletonList( "localhost:3333" ), 10L/*seconds*/ ) )
207+
.thenReturn(
208+
getServers( singletonList( "localhost:5555" ), singletonList( "localhost:5555" ), singletonList( "localhost:5555" ) ) );
209+
210+
ClusterDriver clusterDriver = forSession( session, clock );
211+
212+
// When
213+
clusterDriver.session( AccessRole.WRITE );
214+
215+
// Then
216+
assertThat( clusterDriver.routingServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) );
217+
assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) );
218+
assertThat( clusterDriver.writeServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) );
219+
}
220+
221+
@Test
222+
public void shouldNotRediscoverWheNoTimeout()
223+
{
224+
// Given
225+
final Session session = mock( Session.class );
226+
Clock clock = mock( Clock.class );
227+
when(clock.millis()).thenReturn( 0L, 9900L, 18800L );
228+
when( session.run( GET_SERVERS ) )
229+
.thenReturn(
230+
getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ),
231+
singletonList( "localhost:2222" ),
232+
singletonList( "localhost:3333" ), 10L/*seconds*/ ) )
233+
.thenReturn(
234+
getServers( singletonList( "localhost:5555" ), singletonList( "localhost:5555" ), singletonList( "localhost:5555" ) ) );
235+
236+
ClusterDriver clusterDriver = forSession( session, clock );
237+
238+
// When
239+
clusterDriver.session( AccessRole.WRITE );
240+
241+
// Then
242+
assertThat( clusterDriver.routingServers(), containsInAnyOrder( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), boltAddress( "localhost", 1113 ) ) );
243+
assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 2222 ) ) );
244+
assertThat( clusterDriver.writeServers(), containsInAnyOrder( boltAddress( "localhost", 3333 ) ) );
245+
}
246+
194247
private ClusterDriver forSession( final Session session )
248+
{
249+
return forSession( session, Clock.SYSTEM );
250+
}
251+
private ClusterDriver forSession( final Session session, Clock clock )
195252
{
196253
return new ClusterDriver( SEED, pool, insecure(),
197254
new BiFunction<Connection,Logger,Session>()
@@ -201,16 +258,23 @@ public Session apply( Connection connection, Logger ignore )
201258
{
202259
return session;
203260
}
204-
}, logging() );
261+
}, clock, logging() );
205262
}
206263

207264
private BoltServerAddress boltAddress( String host, int port )
208265
{
209266
return new BoltServerAddress( host, port );
210267
}
211268

269+
212270
StatementResult getServers( final List<String> routers, final List<String> readers,
213271
final List<String> writers )
272+
{
273+
return getServers( routers,readers, writers, Long.MAX_VALUE );
274+
}
275+
276+
StatementResult getServers( final List<String> routers, final List<String> readers,
277+
final List<String> writers, final long ttl )
214278
{
215279
return new StatementResult()
216280
{
@@ -233,7 +297,7 @@ public Record next()
233297
{
234298
return new InternalRecord( asList( "ttl", "servers" ),
235299
new Value[]{
236-
value( Long.MAX_VALUE ),
300+
value( ttl ),
237301
value( asList( serverInfo( "ROUTE", routers ), serverInfo( "WRITE", writers ),
238302
serverInfo( "READ", readers ) ) )
239303
} );

0 commit comments

Comments
 (0)