Skip to content

Commit 4434a0b

Browse files
authored
Merge pull request #355 from lutovich/1.3-read-without-writer
Read in absence of viable writer
2 parents 256d2b6 + c0fd4c9 commit 4434a0b

18 files changed

+662
-169
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
2321
import java.util.Map;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523

@@ -378,28 +376,4 @@ private void closeCurrentConnection( boolean sync )
378376
logger.debug( "Released connection " + connection.hashCode() );
379377
}
380378
}
381-
382-
private static List<Throwable> recordError( Throwable error, List<Throwable> errors )
383-
{
384-
if ( errors == null )
385-
{
386-
errors = new ArrayList<>();
387-
}
388-
errors.add( error );
389-
return errors;
390-
}
391-
392-
private static void addSuppressed( Throwable error, List<Throwable> suppressedErrors )
393-
{
394-
if ( suppressedErrors != null )
395-
{
396-
for ( Throwable suppressedError : suppressedErrors )
397-
{
398-
if ( error != suppressedError )
399-
{
400-
error.addSuppressed( suppressedError );
401-
}
402-
}
403-
}
404-
}
405379
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.neo4j.driver.internal.net.BoltServerAddress;
2727
import org.neo4j.driver.internal.util.Clock;
28+
import org.neo4j.driver.v1.AccessMode;
2829

2930
import static java.lang.String.format;
3031
import static java.util.Arrays.asList;
@@ -34,7 +35,7 @@ public class ClusterRoutingTable implements RoutingTable
3435
private static final int MIN_ROUTERS = 1;
3536

3637
private final Clock clock;
37-
private long expirationTimeout;
38+
private volatile long expirationTimeout;
3839
private final RoundRobinAddressSet readers;
3940
private final RoundRobinAddressSet writers;
4041
private final RoundRobinAddressSet routers;
@@ -56,12 +57,12 @@ private ClusterRoutingTable( Clock clock )
5657
}
5758

5859
@Override
59-
public boolean isStale()
60+
public boolean isStaleFor( AccessMode mode )
6061
{
61-
return expirationTimeout < clock.millis() || // the expiration timeout has been reached
62-
routers.size() <= MIN_ROUTERS || // we need to discover more routing servers
63-
readers.size() == 0 || // we need to discover more read servers
64-
writers.size() == 0; // we need to discover more write servers
62+
return expirationTimeout < clock.millis() ||
63+
routers.size() <= MIN_ROUTERS ||
64+
mode == AccessMode.READ && readers.size() == 0 ||
65+
mode == AccessMode.WRITE && writers.size() == 0;
6566
}
6667

6768
@Override
@@ -115,7 +116,7 @@ public void removeWriter( BoltServerAddress toRemove )
115116

116117

117118
@Override
118-
public String toString()
119+
public synchronized String toString()
119120
{
120121
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s",
121122
expirationTimeout, clock.millis(), routers, writers, readers );

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.neo4j.driver.v1.AccessMode;
3030
import org.neo4j.driver.v1.Logger;
3131
import org.neo4j.driver.v1.Logging;
32-
import org.neo4j.driver.v1.exceptions.ProtocolException;
3332
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
33+
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3434

3535
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
3636
{
@@ -61,15 +61,14 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
6161
this.rediscovery = rediscovery;
6262
this.log = log;
6363

64-
// initialize the routing table
65-
ensureRouting();
64+
refreshRoutingTable();
6665
}
6766

6867
@Override
6968
public PooledConnection acquireConnection( AccessMode mode )
7069
{
7170
RoundRobinAddressSet addressSet = addressSetFor( mode );
72-
PooledConnection connection = acquireConnection( addressSet );
71+
PooledConnection connection = acquireConnection( mode, addressSet );
7372
return new RoutingPooledConnection( connection, this, mode );
7473
}
7574

@@ -91,26 +90,23 @@ public void close() throws Exception
9190
connections.close();
9291
}
9392

94-
private PooledConnection acquireConnection( RoundRobinAddressSet servers ) throws ServiceUnavailableException
93+
private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers )
9594
{
96-
for ( ; ; )
95+
ensureRouting( mode );
96+
for ( BoltServerAddress address; (address = servers.next()) != null; )
9797
{
98-
// refresh the routing table if needed
99-
ensureRouting();
100-
for ( BoltServerAddress address; (address = servers.next()) != null; )
98+
try
10199
{
102-
try
103-
{
104-
return connections.acquire( address );
105-
}
106-
catch ( ServiceUnavailableException e )
107-
{
108-
log.error( "Failed to obtain a connection towards address " + address, e );
109-
forget( address );
110-
}
100+
return connections.acquire( address );
101+
}
102+
catch ( ServiceUnavailableException e )
103+
{
104+
log.error( "Failed to obtain a connection towards address " + address, e );
105+
forget( address );
111106
}
112-
// if we get here, we failed to connect to any server, so we will rebuild the routing table
113107
}
108+
throw new SessionExpiredException(
109+
"Failed to obtain connection towards " + mode + " server. Known routing table is: " + routingTable );
114110
}
115111

116112
private synchronized void forget( BoltServerAddress address )
@@ -121,23 +117,28 @@ private synchronized void forget( BoltServerAddress address )
121117
connections.purge( address );
122118
}
123119

124-
synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolException
120+
synchronized void ensureRouting( AccessMode mode )
125121
{
126-
if ( routingTable.isStale() )
122+
if ( routingTable.isStaleFor( mode ) )
127123
{
128-
log.info( "Routing information is stale. %s", routingTable );
124+
refreshRoutingTable();
125+
}
126+
}
129127

130-
// get a new routing table
131-
ClusterComposition cluster = rediscovery.lookupClusterComposition( connections, routingTable );
132-
Set<BoltServerAddress> removed = routingTable.update( cluster );
133-
// purge connections to removed addresses
134-
for ( BoltServerAddress address : removed )
135-
{
136-
connections.purge( address );
137-
}
128+
synchronized void refreshRoutingTable()
129+
{
130+
log.info( "Routing information is stale. %s", routingTable );
138131

139-
log.info( "Refreshed routing information. %s", routingTable );
132+
// get a new routing table
133+
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
134+
Set<BoltServerAddress> removed = routingTable.update( cluster );
135+
// purge connections to removed addresses
136+
for ( BoltServerAddress address : removed )
137+
{
138+
connections.purge( address );
140139
}
140+
141+
log.info( "Refreshed routing information. %s", routingTable );
141142
}
142143

143144
private RoundRobinAddressSet addressSetFor( AccessMode mode )

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.Collections;
2122
import java.util.HashSet;
2223
import java.util.Set;
2324

@@ -42,6 +43,8 @@ public class Rediscovery
4243
private final ClusterCompositionProvider provider;
4344
private final HostNameResolver hostNameResolver;
4445

46+
private boolean useInitialRouter;
47+
4548
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
4649
ClusterCompositionProvider provider, HostNameResolver hostNameResolver )
4750
{
@@ -53,9 +56,15 @@ public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, C
5356
this.hostNameResolver = hostNameResolver;
5457
}
5558

56-
// Given the current routing table and connection pool, use the connection composition provider to fetch a new
57-
// cluster composition, which would be used to update the routing table and connection pool
58-
public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable )
59+
/**
60+
* Given the current routing table and connection pool, use the connection composition provider to fetch a new
61+
* cluster composition, which would be used to update the routing table and connection pool.
62+
*
63+
* @param routingTable current routing table.
64+
* @param connections connection pool.
65+
* @return new cluster composition.
66+
*/
67+
public ClusterComposition lookupClusterComposition( RoutingTable routingTable, ConnectionPool connections )
5968
{
6069
int failures = 0;
6170

@@ -65,7 +74,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
6574
sleep( waitTime );
6675
start = clock.millis();
6776

68-
ClusterComposition composition = lookupClusterCompositionOnKnownRouters( connections, routingTable );
77+
ClusterComposition composition = lookup( routingTable, connections );
6978
if ( composition != null )
7079
{
7180
return composition;
@@ -78,11 +87,56 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
7887
}
7988
}
8089

81-
private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections,
82-
RoutingTable routingTable )
90+
private ClusterComposition lookup( RoutingTable routingTable, ConnectionPool connections )
91+
{
92+
ClusterComposition composition;
93+
94+
if ( useInitialRouter )
95+
{
96+
composition = lookupOnInitialRouterThenOnKnownRouters( routingTable, connections );
97+
useInitialRouter = false;
98+
}
99+
else
100+
{
101+
composition = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connections );
102+
}
103+
104+
if ( composition != null && !composition.hasWriters() )
105+
{
106+
useInitialRouter = true;
107+
}
108+
109+
return composition;
110+
}
111+
112+
private ClusterComposition lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable,
113+
ConnectionPool connections )
114+
{
115+
Set<BoltServerAddress> seenServers = new HashSet<>();
116+
ClusterComposition composition = lookupOnKnownRouters( routingTable, connections, seenServers );
117+
if ( composition == null )
118+
{
119+
return lookupOnInitialRouter( routingTable, connections, seenServers );
120+
}
121+
return composition;
122+
}
123+
124+
private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable,
125+
ConnectionPool connections )
126+
{
127+
Set<BoltServerAddress> seenServers = Collections.emptySet();
128+
ClusterComposition composition = lookupOnInitialRouter( routingTable, connections, seenServers );
129+
if ( composition == null )
130+
{
131+
return lookupOnKnownRouters( routingTable, connections, new HashSet<BoltServerAddress>() );
132+
}
133+
return composition;
134+
}
135+
136+
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
137+
Set<BoltServerAddress> seenServers )
83138
{
84139
int size = routingTable.routerSize();
85-
Set<BoltServerAddress> triedServers = new HashSet<>();
86140
for ( int i = 0; i < size; i++ )
87141
{
88142
BoltServerAddress address = routingTable.nextRouter();
@@ -91,22 +145,28 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
91145
break;
92146
}
93147

94-
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
148+
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
95149
if ( composition != null )
96150
{
97151
return composition;
98152
}
99153
else
100154
{
101-
triedServers.add( address );
155+
seenServers.add( address );
102156
}
103157
}
104158

159+
return null;
160+
}
161+
162+
private ClusterComposition lookupOnInitialRouter( RoutingTable routingTable,
163+
ConnectionPool connections, Set<BoltServerAddress> triedServers )
164+
{
105165
Set<BoltServerAddress> ips = hostNameResolver.resolve( initialRouter );
106166
ips.removeAll( triedServers );
107167
for ( BoltServerAddress address : ips )
108168
{
109-
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
169+
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
110170
if ( composition != null )
111171
{
112172
return composition;
@@ -116,8 +176,8 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
116176
return null;
117177
}
118178

119-
private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress,
120-
ConnectionPool connections, RoutingTable routingTable )
179+
private ClusterComposition lookupOnRouter( BoltServerAddress routerAddress, RoutingTable routingTable,
180+
ConnectionPool connections )
121181
{
122182
ClusterCompositionResponse response;
123183
try ( Connection connection = connections.acquire( routerAddress ) )
@@ -139,11 +199,7 @@ private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress r
139199

140200
ClusterComposition cluster = response.clusterComposition();
141201
logger.info( "Got cluster composition %s", cluster );
142-
if ( cluster.hasWriters() )
143-
{
144-
return cluster;
145-
}
146-
return null;
202+
return cluster;
147203
}
148204

149205
private void sleep( long millis )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
import java.util.Set;
2222

2323
import org.neo4j.driver.internal.net.BoltServerAddress;
24+
import org.neo4j.driver.v1.AccessMode;
2425

2526
public interface RoutingTable
2627
{
27-
boolean isStale();
28+
boolean isStaleFor( AccessMode mode );
2829

2930
Set<BoltServerAddress> update( ClusterComposition cluster );
3031

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo
6969
}
7070

7171
@Override
72-
public PooledConnection acquire( final BoltServerAddress address )
72+
public PooledConnection acquire( BoltServerAddress address )
7373
{
7474
assertNotClosed();
7575
BlockingPooledConnectionQueue connectionQueue = pool( address );

0 commit comments

Comments
 (0)