Skip to content

Commit a1ba224

Browse files
author
Zhen Li
committed
Remove routing table when it is stale for too long.
1 parent e19076b commit a1ba224

20 files changed

+1014
-394
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.pool;
20+
21+
import io.netty.channel.Channel;
22+
23+
import org.neo4j.driver.internal.spi.Connection;
24+
25+
public interface ConnectionFactory
26+
{
27+
Connection createConnection( Channel channel, ExtendedChannelPool pool );
28+
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4141
import org.neo4j.driver.internal.BoltServerAddress;
4242
import org.neo4j.driver.internal.async.connection.ChannelConnector;
43-
import org.neo4j.driver.internal.async.NetworkConnection;
4443
import org.neo4j.driver.internal.metrics.ListenerEvent;
4544
import org.neo4j.driver.internal.metrics.MetricsListener;
4645
import org.neo4j.driver.internal.spi.Connection;
@@ -57,32 +56,33 @@ public class ConnectionPoolImpl implements ConnectionPool
5756
private final NettyChannelTracker nettyChannelTracker;
5857
private final NettyChannelHealthChecker channelHealthChecker;
5958
private final PoolSettings settings;
60-
private final Clock clock;
6159
private final Logger log;
6260
private final MetricsListener metricsListener;
6361
private final boolean ownsEventLoopGroup;
6462

6563
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
6664
private final AtomicBoolean closed = new AtomicBoolean();
65+
private final ConnectionFactory connectionFactory;
6766

68-
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup )
67+
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging,
68+
Clock clock, boolean ownsEventLoopGroup )
6969
{
70-
71-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock, ownsEventLoopGroup );
70+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging,
71+
clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener ) );
7272
}
7373

74-
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
75-
PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup )
74+
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker, PoolSettings settings,
75+
MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup, ConnectionFactory connectionFactory )
7676
{
7777
this.connector = connector;
7878
this.bootstrap = bootstrap;
7979
this.nettyChannelTracker = nettyChannelTracker;
8080
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
8181
this.settings = settings;
8282
this.metricsListener = metricsListener;
83-
this.clock = clock;
8483
this.log = logging.getLog( ConnectionPool.class.getSimpleName() );
8584
this.ownsEventLoopGroup = ownsEventLoopGroup;
85+
this.connectionFactory = connectionFactory;
8686
}
8787

8888
@Override
@@ -103,7 +103,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
103103
{
104104
processAcquisitionError( pool, address, error );
105105
assertNotClosed( address, channel, pool );
106-
Connection connection = new NetworkConnection( channel, pool, clock, metricsListener );
106+
Connection connection = connectionFactory.createConnection( channel, pool );
107107

108108
metricsListener.afterAcquiredOrCreated( address, acquireEvent );
109109
return connection;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.pool;
20+
21+
import io.netty.channel.Channel;
22+
23+
import org.neo4j.driver.internal.async.NetworkConnection;
24+
import org.neo4j.driver.internal.metrics.MetricsListener;
25+
import org.neo4j.driver.internal.spi.Connection;
26+
import org.neo4j.driver.internal.util.Clock;
27+
28+
public class NetworkConnectionFactory implements ConnectionFactory
29+
{
30+
private final Clock clock;
31+
private final MetricsListener metricsListener;
32+
33+
public NetworkConnectionFactory( Clock clock, MetricsListener metricsListener )
34+
{
35+
this.clock = clock;
36+
this.metricsListener = metricsListener;
37+
}
38+
39+
@Override
40+
public Connection createConnection( Channel channel, ExtendedChannelPool pool )
41+
{
42+
return new NetworkConnection( channel, pool, clock, metricsListener );
43+
}
44+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,16 @@
2121
import java.util.LinkedHashSet;
2222
import java.util.Objects;
2323
import java.util.Set;
24+
import java.util.function.Function;
2425

25-
import org.neo4j.driver.internal.BoltServerAddress;
2626
import org.neo4j.driver.Record;
2727
import org.neo4j.driver.Value;
28-
import java.util.function.Function;
28+
import org.neo4j.driver.internal.BoltServerAddress;
2929

3030
public final class ClusterComposition
3131
{
3232
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
33-
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
34-
new Function<Value,BoltServerAddress>()
35-
{
36-
@Override
37-
public BoltServerAddress apply( Value value )
38-
{
39-
return new BoltServerAddress( value.asString() );
40-
}
41-
};
33+
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress = value -> new BoltServerAddress( value.asString() );
4234

4335
private final Set<BoltServerAddress> readers;
4436
private final Set<BoltServerAddress> writers;

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ public boolean isStaleFor( AccessMode mode )
6767
mode == AccessMode.WRITE && writers.size() == 0;
6868
}
6969

70+
@Override
71+
public boolean isStale( long staleRoutingTableTimeout )
72+
{
73+
return expirationTimeout + staleRoutingTableTimeout < clock.millis();
74+
}
75+
7076
@Override
7177
public synchronized void update( ClusterComposition cluster )
7278
{

0 commit comments

Comments
 (0)