Skip to content

Commit c0fd4c9

Browse files
committed
Read in absence of viable writer
Previously driver did not allow reads and writes when received routing table did not contains both routers, readers and writers. This was inconsistent with Causal Cluster which allows reads when leader is absent. Leader might be unavailable for a long time (when there is a DC failure, etc.) so it makes sense to allow clients to perform read activity. This commit makes driver accept routing table with no writers and allow clients to perform read operations when writers are not available. It might be problematic when there is a cluster partition and one partition contains majority. For this case special care must be taken so driver does not get stuck talking only to the smaller partition which only knows about itself. This is done on best effort basis - driver tries to contact seed router if previously accepted routing table did not contain writes.
1 parent 256d2b6 commit c0fd4c9

18 files changed

+662
-169
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
2321
import java.util.Map;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523

@@ -378,28 +376,4 @@ private void closeCurrentConnection( boolean sync )
378376
logger.debug( "Released connection " + connection.hashCode() );
379377
}
380378
}
381-
382-
private static List<Throwable> recordError( Throwable error, List<Throwable> errors )
383-
{
384-
if ( errors == null )
385-
{
386-
errors = new ArrayList<>();
387-
}
388-
errors.add( error );
389-
return errors;
390-
}
391-
392-
private static void addSuppressed( Throwable error, List<Throwable> suppressedErrors )
393-
{
394-
if ( suppressedErrors != null )
395-
{
396-
for ( Throwable suppressedError : suppressedErrors )
397-
{
398-
if ( error != suppressedError )
399-
{
400-
error.addSuppressed( suppressedError );
401-
}
402-
}
403-
}
404-
}
405379
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.neo4j.driver.internal.net.BoltServerAddress;
2727
import org.neo4j.driver.internal.util.Clock;
28+
import org.neo4j.driver.v1.AccessMode;
2829

2930
import static java.lang.String.format;
3031
import static java.util.Arrays.asList;
@@ -34,7 +35,7 @@ public class ClusterRoutingTable implements RoutingTable
3435
private static final int MIN_ROUTERS = 1;
3536

3637
private final Clock clock;
37-
private long expirationTimeout;
38+
private volatile long expirationTimeout;
3839
private final RoundRobinAddressSet readers;
3940
private final RoundRobinAddressSet writers;
4041
private final RoundRobinAddressSet routers;
@@ -56,12 +57,12 @@ private ClusterRoutingTable( Clock clock )
5657
}
5758

5859
@Override
59-
public boolean isStale()
60+
public boolean isStaleFor( AccessMode mode )
6061
{
61-
return expirationTimeout < clock.millis() || // the expiration timeout has been reached
62-
routers.size() <= MIN_ROUTERS || // we need to discover more routing servers
63-
readers.size() == 0 || // we need to discover more read servers
64-
writers.size() == 0; // we need to discover more write servers
62+
return expirationTimeout < clock.millis() ||
63+
routers.size() <= MIN_ROUTERS ||
64+
mode == AccessMode.READ && readers.size() == 0 ||
65+
mode == AccessMode.WRITE && writers.size() == 0;
6566
}
6667

6768
@Override
@@ -115,7 +116,7 @@ public void removeWriter( BoltServerAddress toRemove )
115116

116117

117118
@Override
118-
public String toString()
119+
public synchronized String toString()
119120
{
120121
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s",
121122
expirationTimeout, clock.millis(), routers, writers, readers );

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.neo4j.driver.v1.AccessMode;
3030
import org.neo4j.driver.v1.Logger;
3131
import org.neo4j.driver.v1.Logging;
32-
import org.neo4j.driver.v1.exceptions.ProtocolException;
3332
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
33+
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3434

3535
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
3636
{
@@ -61,15 +61,14 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
6161
this.rediscovery = rediscovery;
6262
this.log = log;
6363

64-
// initialize the routing table
65-
ensureRouting();
64+
refreshRoutingTable();
6665
}
6766

6867
@Override
6968
public PooledConnection acquireConnection( AccessMode mode )
7069
{
7170
RoundRobinAddressSet addressSet = addressSetFor( mode );
72-
PooledConnection connection = acquireConnection( addressSet );
71+
PooledConnection connection = acquireConnection( mode, addressSet );
7372
return new RoutingPooledConnection( connection, this, mode );
7473
}
7574

@@ -91,26 +90,23 @@ public void close() throws Exception
9190
connections.close();
9291
}
9392

94-
private PooledConnection acquireConnection( RoundRobinAddressSet servers ) throws ServiceUnavailableException
93+
private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers )
9594
{
96-
for ( ; ; )
95+
ensureRouting( mode );
96+
for ( BoltServerAddress address; (address = servers.next()) != null; )
9797
{
98-
// refresh the routing table if needed
99-
ensureRouting();
100-
for ( BoltServerAddress address; (address = servers.next()) != null; )
98+
try
10199
{
102-
try
103-
{
104-
return connections.acquire( address );
105-
}
106-
catch ( ServiceUnavailableException e )
107-
{
108-
log.error( "Failed to obtain a connection towards address " + address, e );
109-
forget( address );
110-
}
100+
return connections.acquire( address );
101+
}
102+
catch ( ServiceUnavailableException e )
103+
{
104+
log.error( "Failed to obtain a connection towards address " + address, e );
105+
forget( address );
111106
}
112-
// if we get here, we failed to connect to any server, so we will rebuild the routing table
113107
}
108+
throw new SessionExpiredException(
109+
"Failed to obtain connection towards " + mode + " server. Known routing table is: " + routingTable );
114110
}
115111

116112
private synchronized void forget( BoltServerAddress address )
@@ -121,23 +117,28 @@ private synchronized void forget( BoltServerAddress address )
121117
connections.purge( address );
122118
}
123119

124-
synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolException
120+
synchronized void ensureRouting( AccessMode mode )
125121
{
126-
if ( routingTable.isStale() )
122+
if ( routingTable.isStaleFor( mode ) )
127123
{
128-
log.info( "Routing information is stale. %s", routingTable );
124+
refreshRoutingTable();
125+
}
126+
}
129127

130-
// get a new routing table
131-
ClusterComposition cluster = rediscovery.lookupClusterComposition( connections, routingTable );
132-
Set<BoltServerAddress> removed = routingTable.update( cluster );
133-
// purge connections to removed addresses
134-
for ( BoltServerAddress address : removed )
135-
{
136-
connections.purge( address );
137-
}
128+
synchronized void refreshRoutingTable()
129+
{
130+
log.info( "Routing information is stale. %s", routingTable );
138131

139-
log.info( "Refreshed routing information. %s", routingTable );
132+
// get a new routing table
133+
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
134+
Set<BoltServerAddress> removed = routingTable.update( cluster );
135+
// purge connections to removed addresses
136+
for ( BoltServerAddress address : removed )
137+
{
138+
connections.purge( address );
140139
}
140+
141+
log.info( "Refreshed routing information. %s", routingTable );
141142
}
142143

143144
private RoundRobinAddressSet addressSetFor( AccessMode mode )

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.Collections;
2122
import java.util.HashSet;
2223
import java.util.Set;
2324

@@ -42,6 +43,8 @@ public class Rediscovery
4243
private final ClusterCompositionProvider provider;
4344
private final HostNameResolver hostNameResolver;
4445

46+
private boolean useInitialRouter;
47+
4548
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
4649
ClusterCompositionProvider provider, HostNameResolver hostNameResolver )
4750
{
@@ -53,9 +56,15 @@ public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, C
5356
this.hostNameResolver = hostNameResolver;
5457
}
5558

56-
// Given the current routing table and connection pool, use the connection composition provider to fetch a new
57-
// cluster composition, which would be used to update the routing table and connection pool
58-
public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable )
59+
/**
60+
* Given the current routing table and connection pool, use the connection composition provider to fetch a new
61+
* cluster composition, which would be used to update the routing table and connection pool.
62+
*
63+
* @param routingTable current routing table.
64+
* @param connections connection pool.
65+
* @return new cluster composition.
66+
*/
67+
public ClusterComposition lookupClusterComposition( RoutingTable routingTable, ConnectionPool connections )
5968
{
6069
int failures = 0;
6170

@@ -65,7 +74,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
6574
sleep( waitTime );
6675
start = clock.millis();
6776

68-
ClusterComposition composition = lookupClusterCompositionOnKnownRouters( connections, routingTable );
77+
ClusterComposition composition = lookup( routingTable, connections );
6978
if ( composition != null )
7079
{
7180
return composition;
@@ -78,11 +87,56 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
7887
}
7988
}
8089

81-
private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections,
82-
RoutingTable routingTable )
90+
private ClusterComposition lookup( RoutingTable routingTable, ConnectionPool connections )
91+
{
92+
ClusterComposition composition;
93+
94+
if ( useInitialRouter )
95+
{
96+
composition = lookupOnInitialRouterThenOnKnownRouters( routingTable, connections );
97+
useInitialRouter = false;
98+
}
99+
else
100+
{
101+
composition = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connections );
102+
}
103+
104+
if ( composition != null && !composition.hasWriters() )
105+
{
106+
useInitialRouter = true;
107+
}
108+
109+
return composition;
110+
}
111+
112+
private ClusterComposition lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable,
113+
ConnectionPool connections )
114+
{
115+
Set<BoltServerAddress> seenServers = new HashSet<>();
116+
ClusterComposition composition = lookupOnKnownRouters( routingTable, connections, seenServers );
117+
if ( composition == null )
118+
{
119+
return lookupOnInitialRouter( routingTable, connections, seenServers );
120+
}
121+
return composition;
122+
}
123+
124+
private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable,
125+
ConnectionPool connections )
126+
{
127+
Set<BoltServerAddress> seenServers = Collections.emptySet();
128+
ClusterComposition composition = lookupOnInitialRouter( routingTable, connections, seenServers );
129+
if ( composition == null )
130+
{
131+
return lookupOnKnownRouters( routingTable, connections, new HashSet<BoltServerAddress>() );
132+
}
133+
return composition;
134+
}
135+
136+
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
137+
Set<BoltServerAddress> seenServers )
83138
{
84139
int size = routingTable.routerSize();
85-
Set<BoltServerAddress> triedServers = new HashSet<>();
86140
for ( int i = 0; i < size; i++ )
87141
{
88142
BoltServerAddress address = routingTable.nextRouter();
@@ -91,22 +145,28 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
91145
break;
92146
}
93147

94-
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
148+
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
95149
if ( composition != null )
96150
{
97151
return composition;
98152
}
99153
else
100154
{
101-
triedServers.add( address );
155+
seenServers.add( address );
102156
}
103157
}
104158

159+
return null;
160+
}
161+
162+
private ClusterComposition lookupOnInitialRouter( RoutingTable routingTable,
163+
ConnectionPool connections, Set<BoltServerAddress> triedServers )
164+
{
105165
Set<BoltServerAddress> ips = hostNameResolver.resolve( initialRouter );
106166
ips.removeAll( triedServers );
107167
for ( BoltServerAddress address : ips )
108168
{
109-
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
169+
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
110170
if ( composition != null )
111171
{
112172
return composition;
@@ -116,8 +176,8 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
116176
return null;
117177
}
118178

119-
private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress,
120-
ConnectionPool connections, RoutingTable routingTable )
179+
private ClusterComposition lookupOnRouter( BoltServerAddress routerAddress, RoutingTable routingTable,
180+
ConnectionPool connections )
121181
{
122182
ClusterCompositionResponse response;
123183
try ( Connection connection = connections.acquire( routerAddress ) )
@@ -139,11 +199,7 @@ private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress r
139199

140200
ClusterComposition cluster = response.clusterComposition();
141201
logger.info( "Got cluster composition %s", cluster );
142-
if ( cluster.hasWriters() )
143-
{
144-
return cluster;
145-
}
146-
return null;
202+
return cluster;
147203
}
148204

149205
private void sleep( long millis )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
import java.util.Set;
2222

2323
import org.neo4j.driver.internal.net.BoltServerAddress;
24+
import org.neo4j.driver.v1.AccessMode;
2425

2526
public interface RoutingTable
2627
{
27-
boolean isStale();
28+
boolean isStaleFor( AccessMode mode );
2829

2930
Set<BoltServerAddress> update( ClusterComposition cluster );
3031

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo
6969
}
7070

7171
@Override
72-
public PooledConnection acquire( final BoltServerAddress address )
72+
public PooledConnection acquire( BoltServerAddress address )
7373
{
7474
assertNotClosed();
7575
BlockingPooledConnectionQueue connectionQueue = pool( address );

0 commit comments

Comments
 (0)