Skip to content

Decouple load balancing logic from address set #384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,23 @@

import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.net.BoltServerAddress;

public class RoundRobinAddressSet
public class AddressSet
{
private static final BoltServerAddress[] NONE = {};
private final AtomicInteger offset = new AtomicInteger();
private volatile BoltServerAddress[] addresses = NONE;

public int size()
{
return addresses.length;
}
private volatile BoltServerAddress[] addresses = NONE;

public BoltServerAddress next()
public BoltServerAddress[] toArray()
{
BoltServerAddress[] addresses = this.addresses;
if ( addresses.length == 0 )
{
return null;
}
return addresses[next( addresses.length )];
return addresses;
}

int next( int divisor )
public int size()
{
int index = offset.getAndIncrement();
for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() )
{
offset.compareAndSet( Integer.MIN_VALUE, index % divisor );
}
return index % divisor;
return addresses.length;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
Expand Down Expand Up @@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address )
@Override
public String toString()
{
return "RoundRobinAddressSet=" + Arrays.toString( addresses );
}

/** breaking encapsulation in order to perform white-box testing of boundary case */
void setOffset( int target )
{
offset.set( target );
return "AddressSet=" + Arrays.toString( addresses );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable

private final Clock clock;
private volatile long expirationTimeout;
private final RoundRobinAddressSet readers;
private final RoundRobinAddressSet writers;
private final RoundRobinAddressSet routers;
private final AddressSet readers;
private final AddressSet writers;
private final AddressSet routers;

public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
Expand All @@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock )
this.clock = clock;
this.expirationTimeout = clock.millis() - 1;

this.readers = new RoundRobinAddressSet();
this.writers = new RoundRobinAddressSet();
this.routers = new RoundRobinAddressSet();
this.readers = new AddressSet();
this.writers = new AddressSet();
this.routers = new AddressSet();
}

@Override
Expand Down Expand Up @@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address )
}

@Override
public RoundRobinAddressSet readers()
public AddressSet readers()
{
return readers;
}

@Override
public RoundRobinAddressSet writers()
public AddressSet writers()
{
return writers;
}

@Override
public BoltServerAddress nextRouter()
public AddressSet routers()
{
return routers.next();
}

@Override
public int routerSize()
{
return routers.size();
return routers;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
private final ConnectionPool connections;
private final RoutingTable routingTable;
private final Rediscovery rediscovery;
private final LoadBalancingStrategy loadBalancingStrategy;
private final Logger log;

public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
Expand All @@ -59,6 +60,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
this.connections = connections;
this.routingTable = routingTable;
this.rediscovery = rediscovery;
this.loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();
this.log = log;

refreshRoutingTable();
Expand All @@ -67,7 +69,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
@Override
public PooledConnection acquireConnection( AccessMode mode )
{
RoundRobinAddressSet addressSet = addressSetFor( mode );
AddressSet addressSet = addressSetFor( mode );
PooledConnection connection = acquireConnection( mode, addressSet );
return new RoutingPooledConnection( connection, this, mode );
}
Expand All @@ -90,10 +92,10 @@ public void close() throws Exception
connections.close();
}

private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers )
private PooledConnection acquireConnection( AccessMode mode, AddressSet servers )
{
ensureRouting( mode );
for ( BoltServerAddress address; (address = servers.next()) != null; )
for ( BoltServerAddress address; (address = selectAddress( mode, servers )) != null; )
{
try
{
Expand Down Expand Up @@ -141,7 +143,7 @@ synchronized void refreshRoutingTable()
log.info( "Refreshed routing information. %s", routingTable );
}

private RoundRobinAddressSet addressSetFor( AccessMode mode )
private AddressSet addressSetFor( AccessMode mode )
{
switch ( mode )
{
Expand All @@ -150,7 +152,22 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
case WRITE:
return routingTable.writers();
default:
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
throw unknownMode( mode );
}
}

private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers )
{
BoltServerAddress[] addresses = servers.toArray();

switch ( mode )
{
case READ:
return loadBalancingStrategy.selectReader( addresses );
case WRITE:
return loadBalancingStrategy.selectWriter( addresses );
default:
throw unknownMode( mode );
}
}

Expand All @@ -161,4 +178,9 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R
new RoutingProcedureClusterCompositionProvider( clock, log, settings );
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) );
}

private static RuntimeException unknownMode( AccessMode mode )
{
return new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import org.neo4j.driver.internal.net.BoltServerAddress;

public interface LoadBalancingStrategy
{
BoltServerAddress selectReader( BoltServerAddress[] knownReaders );

BoltServerAddress selectWriter( BoltServerAddress[] knownWriters );
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
Set<BoltServerAddress> seenServers )
{
int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
{
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
break;
}
BoltServerAddress[] addresses = routingTable.routers().toArray();

for ( BoltServerAddress address : addresses )
{
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
if ( composition != null )
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinArrayIndex
{
private final AtomicInteger offset;

RoundRobinArrayIndex()
{
this( 0 );
}

// only for testing
RoundRobinArrayIndex( int initialOffset )
{
this.offset = new AtomicInteger( initialOffset );
}

public int next( int arrayLength )
{
if ( arrayLength == 0 )
{
return -1;
}

int nextOffset;
while ( (nextOffset = offset.getAndIncrement()) < 0 )
{
// overflow, try resetting back to zero
offset.compareAndSet( nextOffset + 1, 0 );
}
return nextOffset % arrayLength;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import org.neo4j.driver.internal.net.BoltServerAddress;

public class RoundRobinLoadBalancingStrategy implements LoadBalancingStrategy
{
private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();

@Override
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
{
return select( knownReaders, readersIndex );
}

@Override
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
{
return select( knownWriters, writersIndex );
}

private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex roundRobinIndex )
{
int length = addresses.length;
if ( length == 0 )
{
return null;
}
int index = roundRobinIndex.next( length );
return addresses[index];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ public interface RoutingTable

void forget( BoltServerAddress address );

RoundRobinAddressSet readers();
AddressSet readers();

RoundRobinAddressSet writers();
AddressSet writers();

BoltServerAddress nextRouter();

int routerSize();
AddressSet routers();

void removeWriter( BoltServerAddress toRemove );
}
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,11 @@ public void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws
@Test
public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Exception
{
StubServer router1 = StubServer.start( "acquire_endpoints.script", 9010 );
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9007 );
StubServer router1 = StubServer.start( "discover_servers.script", 9010 );
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9001 );
StubServer router2 = StubServer.start( "acquire_endpoints.script", 9002 );
StubServer brokenWriter2 = StubServer.start( "dead_write_server.script", 9008 );
StubServer router2 = StubServer.start( "discover_servers.script", 9002 );
StubServer writer = StubServer.start( "write_server.script", 9001 );
StubServer writer = StubServer.start( "write_server.script", 9007 );

try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9010" );
Session session = driver.session() )
Expand All @@ -827,9 +827,9 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw
{
assertEquals( 0, router1.exitStatus() );
assertEquals( 0, brokenWriter1.exitStatus() );
assertEquals( 0, brokenWriter2.exitStatus() );
assertEquals( 0, router2.exitStatus() );
assertEquals( 0, writer.exitStatus() );
assertEquals( 0, brokenWriter2.exitStatus() );
}
}

Expand Down
Loading