Skip to content

Read in absence of viable writer #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -378,28 +376,4 @@ private void closeCurrentConnection( boolean sync )
logger.debug( "Released connection " + connection.hashCode() );
}
}

private static List<Throwable> recordError( Throwable error, List<Throwable> errors )
{
if ( errors == null )
{
errors = new ArrayList<>();
}
errors.add( error );
return errors;
}

private static void addSuppressed( Throwable error, List<Throwable> suppressedErrors )
{
if ( suppressedErrors != null )
{
for ( Throwable suppressedError : suppressedErrors )
{
if ( error != suppressedError )
{
error.addSuppressed( suppressedError );
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.AccessMode;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand All @@ -34,7 +35,7 @@ public class ClusterRoutingTable implements RoutingTable
private static final int MIN_ROUTERS = 1;

private final Clock clock;
private long expirationTimeout;
private volatile long expirationTimeout;
private final RoundRobinAddressSet readers;
private final RoundRobinAddressSet writers;
private final RoundRobinAddressSet routers;
Expand All @@ -56,12 +57,12 @@ private ClusterRoutingTable( Clock clock )
}

@Override
public boolean isStale()
public boolean isStaleFor( AccessMode mode )
{
return expirationTimeout < clock.millis() || // the expiration timeout has been reached
routers.size() <= MIN_ROUTERS || // we need to discover more routing servers
readers.size() == 0 || // we need to discover more read servers
writers.size() == 0; // we need to discover more write servers
return expirationTimeout < clock.millis() ||
routers.size() <= MIN_ROUTERS ||
mode == AccessMode.READ && readers.size() == 0 ||
mode == AccessMode.WRITE && writers.size() == 0;
}

@Override
Expand Down Expand Up @@ -115,7 +116,7 @@ public void removeWriter( BoltServerAddress toRemove )


@Override
public String toString()
public synchronized String toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revisit this synchronized code in another PR.

{
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s",
expirationTimeout, clock.millis(), routers, writers, readers );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ProtocolException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;

public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
{
Expand Down Expand Up @@ -61,15 +61,14 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
this.rediscovery = rediscovery;
this.log = log;

// initialize the routing table
ensureRouting();
refreshRoutingTable();
}

@Override
public PooledConnection acquireConnection( AccessMode mode )
{
RoundRobinAddressSet addressSet = addressSetFor( mode );
PooledConnection connection = acquireConnection( addressSet );
PooledConnection connection = acquireConnection( mode, addressSet );
return new RoutingPooledConnection( connection, this, mode );
}

Expand All @@ -91,26 +90,23 @@ public void close() throws Exception
connections.close();
}

private PooledConnection acquireConnection( RoundRobinAddressSet servers ) throws ServiceUnavailableException
private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers )
{
for ( ; ; )
ensureRouting( mode );
for ( BoltServerAddress address; (address = servers.next()) != null; )
{
// refresh the routing table if needed
ensureRouting();
for ( BoltServerAddress address; (address = servers.next()) != null; )
try
{
try
{
return connections.acquire( address );
}
catch ( ServiceUnavailableException e )
{
log.error( "Failed to obtain a connection towards address " + address, e );
forget( address );
}
return connections.acquire( address );
}
catch ( ServiceUnavailableException e )
{
log.error( "Failed to obtain a connection towards address " + address, e );
forget( address );
}
// if we get here, we failed to connect to any server, so we will rebuild the routing table
}
throw new SessionExpiredException(
"Failed to obtain connection towards " + mode + " server. Known routing table is: " + routingTable );
}

private synchronized void forget( BoltServerAddress address )
Expand All @@ -121,23 +117,28 @@ private synchronized void forget( BoltServerAddress address )
connections.purge( address );
}

synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolException
synchronized void ensureRouting( AccessMode mode )
{
if ( routingTable.isStale() )
if ( routingTable.isStaleFor( mode ) )
{
log.info( "Routing information is stale. %s", routingTable );
refreshRoutingTable();
}
}

// get a new routing table
ClusterComposition cluster = rediscovery.lookupClusterComposition( connections, routingTable );
Set<BoltServerAddress> removed = routingTable.update( cluster );
// purge connections to removed addresses
for ( BoltServerAddress address : removed )
{
connections.purge( address );
}
synchronized void refreshRoutingTable()
{
log.info( "Routing information is stale. %s", routingTable );

log.info( "Refreshed routing information. %s", routingTable );
// get a new routing table
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
Set<BoltServerAddress> removed = routingTable.update( cluster );
// purge connections to removed addresses
for ( BoltServerAddress address : removed )
{
connections.purge( address );
}

log.info( "Refreshed routing information. %s", routingTable );
}

private RoundRobinAddressSet addressSetFor( AccessMode mode )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -42,6 +43,8 @@ public class Rediscovery
private final ClusterCompositionProvider provider;
private final HostNameResolver hostNameResolver;

private boolean useInitialRouter;

public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
ClusterCompositionProvider provider, HostNameResolver hostNameResolver )
{
Expand All @@ -53,9 +56,15 @@ public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, C
this.hostNameResolver = hostNameResolver;
}

// Given the current routing table and connection pool, use the connection composition provider to fetch a new
// cluster composition, which would be used to update the routing table and connection pool
public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable )
/**
* Given the current routing table and connection pool, use the connection composition provider to fetch a new
* cluster composition, which would be used to update the routing table and connection pool.
*
* @param routingTable current routing table.
* @param connections connection pool.
* @return new cluster composition.
*/
public ClusterComposition lookupClusterComposition( RoutingTable routingTable, ConnectionPool connections )
{
int failures = 0;

Expand All @@ -65,7 +74,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
sleep( waitTime );
start = clock.millis();

ClusterComposition composition = lookupClusterCompositionOnKnownRouters( connections, routingTable );
ClusterComposition composition = lookup( routingTable, connections );
if ( composition != null )
{
return composition;
Expand All @@ -78,11 +87,56 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
}
}

private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections,
RoutingTable routingTable )
private ClusterComposition lookup( RoutingTable routingTable, ConnectionPool connections )
{
ClusterComposition composition;

if ( useInitialRouter )
{
composition = lookupOnInitialRouterThenOnKnownRouters( routingTable, connections );
useInitialRouter = false;
}
else
{
composition = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connections );
}

if ( composition != null && !composition.hasWriters() )
{
useInitialRouter = true;
}

return composition;
}

private ClusterComposition lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable,
ConnectionPool connections )
{
Set<BoltServerAddress> seenServers = new HashSet<>();
ClusterComposition composition = lookupOnKnownRouters( routingTable, connections, seenServers );
if ( composition == null )
{
return lookupOnInitialRouter( routingTable, connections, seenServers );
}
return composition;
}

private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable,
ConnectionPool connections )
{
Set<BoltServerAddress> seenServers = Collections.emptySet();
ClusterComposition composition = lookupOnInitialRouter( routingTable, connections, seenServers );
if ( composition == null )
{
return lookupOnKnownRouters( routingTable, connections, new HashSet<BoltServerAddress>() );
}
return composition;
}

private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
Set<BoltServerAddress> seenServers )
{
int size = routingTable.routerSize();
Set<BoltServerAddress> triedServers = new HashSet<>();
for ( int i = 0; i < size; i++ )
{
BoltServerAddress address = routingTable.nextRouter();
Expand All @@ -91,22 +145,28 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
break;
}

ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
if ( composition != null )
{
return composition;
}
else
{
triedServers.add( address );
seenServers.add( address );
}
}

return null;
}

private ClusterComposition lookupOnInitialRouter( RoutingTable routingTable,
ConnectionPool connections, Set<BoltServerAddress> triedServers )
{
Set<BoltServerAddress> ips = hostNameResolver.resolve( initialRouter );
ips.removeAll( triedServers );
for ( BoltServerAddress address : ips )
{
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
if ( composition != null )
{
return composition;
Expand All @@ -116,8 +176,8 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
return null;
}

private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress,
ConnectionPool connections, RoutingTable routingTable )
private ClusterComposition lookupOnRouter( BoltServerAddress routerAddress, RoutingTable routingTable,
ConnectionPool connections )
{
ClusterCompositionResponse response;
try ( Connection connection = connections.acquire( routerAddress ) )
Expand All @@ -139,11 +199,7 @@ private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress r

ClusterComposition cluster = response.clusterComposition();
logger.info( "Got cluster composition %s", cluster );
if ( cluster.hasWriters() )
{
return cluster;
}
return null;
return cluster;
}

private void sleep( long millis )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import java.util.Set;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.v1.AccessMode;

public interface RoutingTable
{
boolean isStale();
boolean isStaleFor( AccessMode mode );

Set<BoltServerAddress> update( ClusterComposition cluster );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo
}

@Override
public PooledConnection acquire( final BoltServerAddress address )
public PooledConnection acquire( BoltServerAddress address )
{
assertNotClosed();
BlockingPooledConnectionQueue connectionQueue = pool( address );
Expand Down
Loading