Skip to content

Commit 5308646

Browse files
committed
Make routing discovery less picky on errors
1 parent ec60c32 commit 5308646

File tree

7 files changed

+171
-56
lines changed

7 files changed

+171
-56
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private CompletionStage<ClusterComposition> lookupOnRouter( BoltServerAddress ro
246246
}
247247
else
248248
{
249-
return response.clusterComposition();
249+
return handleClusterComposition( routerAddress, response );
250250
}
251251
} );
252252
}
@@ -268,6 +268,22 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing
268268
}
269269
}
270270

271+
private ClusterComposition handleClusterComposition( BoltServerAddress routerAddress, ClusterCompositionResponse response )
272+
{
273+
ClusterComposition result = null;
274+
275+
try
276+
{
277+
result = response.clusterComposition();
278+
}
279+
catch ( Exception exc )
280+
{
281+
logger.warn( format( "Unable to process routing table received from '%s'.", routerAddress ), exc );
282+
}
283+
284+
return result;
285+
}
286+
271287
private List<BoltServerAddress> resolve( BoltServerAddress address )
272288
{
273289
return resolver.resolve( address )

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 88 additions & 36 deletions
Large diffs are not rendered by default.

driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static java.util.concurrent.CompletableFuture.completedFuture;
4646
import static org.junit.jupiter.api.Assertions.assertEquals;
4747
import static org.junit.jupiter.api.Assertions.assertNotNull;
48+
import static org.junit.jupiter.api.Assertions.assertNull;
4849
import static org.junit.jupiter.api.Assertions.assertThrows;
4950
import static org.mockito.ArgumentMatchers.any;
5051
import static org.mockito.Mockito.mock;
@@ -163,12 +164,17 @@ void shouldFailImmediatelyWhenClusterCompositionProviderReturnsFailure()
163164
responsesByAddress.put( B, new Failure( protocolError ) ); // first -> fatal failure
164165
responsesByAddress.put( C, new Success( validComposition ) ); // second -> valid cluster composition
165166

167+
Logger logger = mock( Logger.class );
168+
166169
ClusterCompositionProvider compositionProvider = compositionProviderMock( responsesByAddress );
167-
Rediscovery rediscovery = newRediscovery( A, compositionProvider, mock( ServerAddressResolver.class ) );
170+
Rediscovery rediscovery = newRediscovery( A, compositionProvider, mock( ServerAddressResolver.class ), logger );
168171
RoutingTable table = routingTableMock( B, C );
169172

170-
ProtocolException error = assertThrows( ProtocolException.class, () -> await( rediscovery.lookupClusterComposition( table, pool ) ) );
171-
assertEquals( protocolError, error );
173+
// When
174+
ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool ) );
175+
assertEquals( validComposition, composition );
176+
177+
verify( logger ).warn( String.format( "Unable to process routing table received from '%s'.", B ), protocolError );
172178
}
173179

174180
@Test
@@ -302,7 +308,7 @@ void shouldUseInitialRouterToStartWith()
302308

303309
ClusterCompositionProvider compositionProvider = compositionProviderMock( responsesByAddress );
304310
ServerAddressResolver resolver = resolverMock( initialRouter, initialRouter );
305-
Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver, true );
311+
Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver, mock( Logger.class ), true );
306312
RoutingTable table = routingTableMock( B, C, D );
307313

308314
ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool ) );
@@ -323,7 +329,7 @@ void shouldUseKnownRoutersWhenInitialRouterFails()
323329

324330
ClusterCompositionProvider compositionProvider = compositionProviderMock( responsesByAddress );
325331
ServerAddressResolver resolver = resolverMock( initialRouter, initialRouter );
326-
Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver, true );
332+
Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver, mock( Logger.class ), true );
327333
RoutingTable table = routingTableMock( D, E );
328334

329335
ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool ) );
@@ -393,15 +399,21 @@ void shouldNotLogWhenSingleRetryAttemptFails()
393399
private Rediscovery newRediscovery( BoltServerAddress initialRouter, ClusterCompositionProvider compositionProvider,
394400
ServerAddressResolver resolver )
395401
{
396-
return newRediscovery( initialRouter, compositionProvider, resolver, false );
402+
return newRediscovery( initialRouter, compositionProvider, resolver, DEV_NULL_LOGGER );
403+
}
404+
405+
private Rediscovery newRediscovery( BoltServerAddress initialRouter, ClusterCompositionProvider compositionProvider,
406+
ServerAddressResolver resolver, Logger logger )
407+
{
408+
return newRediscovery( initialRouter, compositionProvider, resolver, logger, false );
397409
}
398410

399411
private Rediscovery newRediscovery( BoltServerAddress initialRouter, ClusterCompositionProvider compositionProvider,
400-
ServerAddressResolver resolver, boolean useInitialRouter )
412+
ServerAddressResolver resolver, Logger logger, boolean useInitialRouter )
401413
{
402414
RoutingSettings settings = new RoutingSettings( 1, 0 );
403415
return new Rediscovery( initialRouter, settings, compositionProvider, resolver,
404-
GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGER, useInitialRouter );
416+
GlobalEventExecutor.INSTANCE, logger, useInitialRouter );
405417
}
406418

407419
@SuppressWarnings( "unchecked" )

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ void sessionCreationShouldFailIfCallingDiscoveryProcedureOnEdgeServer()
167167

168168
ClusterMember readReplica = cluster.anyReadReplica();
169169
ServiceUnavailableException e = assertThrows( ServiceUnavailableException.class, () -> createDriver( readReplica.getRoutingUri() ) );
170-
assertThat( e.getMessage(), containsString( "Failed to run 'CALL dbms.cluster.routing" ) );
170+
assertThat( e.getMessage(), containsString( "Could not perform discovery. No routing servers available." ) );
171171
}
172172

173173
// Ensure that Bookmarks work with single instances using a driver created using a bolt[not+routing] URI.
@@ -193,7 +193,7 @@ void bookmarksShouldWorkWithDriverPinnedToSingleServer() throws Exception
193193
assertNotNull( bookmark );
194194

195195
try ( Session session = driver.session( bookmark );
196-
Transaction tx = session.beginTransaction() )
196+
Transaction tx = session.beginTransaction() )
197197
{
198198
Record record = tx.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();
199199
assertEquals( 1, record.get( "count" ).asInt() );
@@ -309,7 +309,7 @@ void beginTransactionThrowsForInvalidBookmark()
309309
ClusterMember leader = clusterRule.getCluster().leader();
310310

311311
try ( Driver driver = createDriver( leader.getBoltUri() );
312-
Session session = driver.session( invalidBookmark ) )
312+
Session session = driver.session( invalidBookmark ) )
313313
{
314314
ClientException e = assertThrows( ClientException.class, session::beginTransaction );
315315
assertThat( e.getMessage(), containsString( invalidBookmark ) );
@@ -323,7 +323,7 @@ void beginTransactionThrowsForUnreachableBookmark()
323323
ClusterMember leader = clusterRule.getCluster().leader();
324324

325325
try ( Driver driver = createDriver( leader.getBoltUri() );
326-
Session session = driver.session() )
326+
Session session = driver.session() )
327327
{
328328
try ( Transaction tx = session.beginTransaction() )
329329
{
@@ -373,7 +373,7 @@ void shouldHandleGracefulLeaderSwitch() throws Exception
373373
} );
374374

375375
try ( Session session2 = driver.session( AccessMode.READ, bookmark );
376-
Transaction tx2 = session2.beginTransaction() )
376+
Transaction tx2 = session2.beginTransaction() )
377377
{
378378
Record record = tx2.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();
379379
tx2.success();
@@ -1059,8 +1059,8 @@ private static boolean isSingleFollowerWithReadReplicas( ClusterOverview overvie
10591059
return false;
10601060
}
10611061
return overview.leaderCount == 0 &&
1062-
overview.followerCount == 1 &&
1063-
overview.readReplicaCount == ClusterExtension.READ_REPLICA_COUNT;
1062+
overview.followerCount == 1 &&
1063+
overview.readReplicaCount == ClusterExtension.READ_REPLICA_COUNT;
10641064
}
10651065

10661066
private static void makeAllChannelsFailToRunQueries( ChannelTrackingDriverFactory driverFactory, ServerVersion dbVersion )
@@ -1108,10 +1108,10 @@ private static class ClusterOverview
11081108
public String toString()
11091109
{
11101110
return "ClusterOverview{" +
1111-
"leaderCount=" + leaderCount +
1112-
", followerCount=" + followerCount +
1113-
", readReplicaCount=" + readReplicaCount +
1114-
'}';
1111+
"leaderCount=" + leaderCount +
1112+
", followerCount=" + followerCount +
1113+
", readReplicaCount=" + readReplicaCount +
1114+
'}';
11151115
}
11161116
}
11171117
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
!: BOLT 3
2+
!: AUTO RESET
3+
4+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6+
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
7+
PULL_ALL
8+
S: SUCCESS {"fields": ["ttl", "servers"]}
9+
RECORD [9223372036854775807, []]
10+
SUCCESS {}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
!: BOLT 3
2+
!: AUTO RESET
3+
4+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6+
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
7+
PULL_ALL
8+
S: SUCCESS {"fields": ["ttl", "servers"]}
9+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9010"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9011"], "role": "READ"},{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"}]]
10+
SUCCESS {}
11+
C: RESET
12+
S: SUCCESS {}
13+
<EXIT>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: BOLT 3
2+
3+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
4+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
5+
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
9+
SUCCESS {}
10+
C: RESET
11+
S: SUCCESS {}
12+
<EXIT>

0 commit comments

Comments
 (0)