Skip to content

Commit ff9e61b

Browse files
authored
Merge pull request #597 from zhenlineo/1.7-pool-closed
Suppress `pool was closed` errors
2 parents a86ea1d + 1a807f4 commit ff9e61b

File tree

7 files changed

+237
-127
lines changed

7 files changed

+237
-127
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
import org.neo4j.driver.v1.Logger;
4646
import org.neo4j.driver.v1.Logging;
4747
import org.neo4j.driver.v1.exceptions.ClientException;
48+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
49+
50+
import static java.lang.String.format;
4851

4952
public class ConnectionPoolImpl implements ConnectionPool
5053
{
@@ -57,12 +60,13 @@ public class ConnectionPoolImpl implements ConnectionPool
5760
private final Logger log;
5861
private final MetricsListener metricsListener;
5962

60-
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
63+
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
6164
private final AtomicBoolean closed = new AtomicBoolean();
6265

6366
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
6467
{
65-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock );
68+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging,
69+
clock );
6670
}
6771

6872
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
@@ -84,7 +88,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
8488
log.trace( "Acquiring a connection from pool towards %s", address );
8589

8690
assertNotClosed();
87-
ChannelPool pool = getOrCreatePool( address );
91+
ExtendedChannelPool pool = getOrCreatePool( address );
8892

8993
ListenerEvent acquireEvent = metricsListener.createListenerEvent();
9094
metricsListener.beforeAcquiringOrCreating( address, acquireEvent );
@@ -94,7 +98,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
9498
{
9599
try
96100
{
97-
processAcquisitionError( address, error );
101+
processAcquisitionError( pool, address, error );
98102
assertNotClosed( address, channel, pool );
99103
Connection connection = new DirectConnection( channel, pool, clock, metricsListener );
100104

@@ -153,7 +157,7 @@ public CompletionStage<Void> close()
153157
try
154158
{
155159
nettyChannelTracker.prepareToCloseChannels();
156-
for ( Map.Entry<BoltServerAddress,ChannelPool> entry : pools.entrySet() )
160+
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
157161
{
158162
BoltServerAddress address = entry.getKey();
159163
ChannelPool pool = entry.getValue();
@@ -178,9 +182,9 @@ public boolean isOpen( BoltServerAddress address )
178182
return pools.containsKey( address );
179183
}
180184

181-
private ChannelPool getOrCreatePool( BoltServerAddress address )
185+
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
182186
{
183-
ChannelPool pool = pools.get( address );
187+
ExtendedChannelPool pool = pools.get( address );
184188
if ( pool != null )
185189
{
186190
return pool;
@@ -201,7 +205,7 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
201205
return pool;
202206
}
203207

204-
ChannelPool newPool( BoltServerAddress address )
208+
ExtendedChannelPool newPool( BoltServerAddress address )
205209
{
206210
return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker,
207211
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );
@@ -212,7 +216,7 @@ private EventLoopGroup eventLoopGroup()
212216
return bootstrap.config().group();
213217
}
214218

215-
private void processAcquisitionError( BoltServerAddress serverAddress, Throwable error )
219+
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
216220
{
217221
Throwable cause = Futures.completionExceptionCause( error );
218222
if ( cause != null )
@@ -226,6 +230,13 @@ private void processAcquisitionError( BoltServerAddress serverAddress, Throwable
226230
"Unable to acquire connection from the pool within configured maximum time of " +
227231
settings.connectionAcquisitionTimeout() + "ms" );
228232
}
233+
else if ( pool.isClosed() )
234+
{
235+
// There is a race condition where a thread tries to acquire a connection while the pool is closed by another concurrent thread.
236+
// Treat as failed to obtain connection for a direct driver. For a routing driver, this error should be retried.
237+
throw new ServiceUnavailableException( format( "Connection pool for server %s is closed while acquiring a connection.", serverAddress ),
238+
cause );
239+
}
229240
else
230241
{
231242
// some unknown error happened during connection acquisition, propagate it
@@ -258,4 +269,10 @@ public String toString()
258269
{
259270
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
260271
}
272+
273+
// for testing only
274+
ExtendedChannelPool getPool( BoltServerAddress address )
275+
{
276+
return pools.get( address );
277+
}
261278
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.pool;
20+
21+
import io.netty.channel.pool.ChannelPool;
22+
23+
public interface ExtendedChannelPool extends ChannelPool
24+
{
25+
boolean isClosed();
26+
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import io.netty.channel.pool.ChannelHealthChecker;
2525
import io.netty.channel.pool.FixedChannelPool;
2626

27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
2729
import org.neo4j.driver.internal.BoltServerAddress;
2830
import org.neo4j.driver.internal.async.ChannelConnector;
2931
import org.neo4j.driver.internal.metrics.ListenerEvent;
3032

3133
import static java.util.Objects.requireNonNull;
3234

33-
public class NettyChannelPool extends FixedChannelPool
35+
public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
3436
{
3537
/**
3638
* Unlimited amount of parties are allowed to request channels from the pool.
@@ -44,6 +46,7 @@ public class NettyChannelPool extends FixedChannelPool
4446
private final BoltServerAddress address;
4547
private final ChannelConnector connector;
4648
private final NettyChannelTracker handler;
49+
private final AtomicBoolean closed = new AtomicBoolean( false );
4750

4851
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
4952
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
@@ -76,4 +79,18 @@ protected ChannelFuture connectChannel( Bootstrap bootstrap )
7679
} );
7780
return channelFuture;
7881
}
82+
83+
@Override
84+
public void close()
85+
{
86+
if ( closed.compareAndSet( false, true ) )
87+
{
88+
super.close();
89+
}
90+
}
91+
92+
public boolean isClosed()
93+
{
94+
return closed.get();
95+
}
7996
}

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.util.concurrent.EventExecutorGroup;
2222

2323
import java.net.UnknownHostException;
24-
import java.util.Arrays;
2524
import java.util.HashSet;
2625
import java.util.List;
2726
import java.util.Set;
@@ -38,12 +37,10 @@
3837
import org.neo4j.driver.v1.Logger;
3938
import org.neo4j.driver.v1.exceptions.SecurityException;
4039
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
41-
import org.neo4j.driver.v1.net.ServerAddress;
4240
import org.neo4j.driver.v1.net.ServerAddressResolver;
4341

4442
import static java.lang.String.format;
4543
import static java.util.Collections.emptySet;
46-
import static java.util.Collections.singleton;
4744
import static java.util.concurrent.CompletableFuture.completedFuture;
4845
import static java.util.stream.Collectors.toList;
4946
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -265,7 +262,7 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing
265262
else
266263
{
267264
// connection turned out to be broken
268-
logger.error( format( "Failed to connect to routing server '%s'.", routerAddress ), error );
265+
logger.info( format( "Failed to connect to routing server '%s'.", routerAddress ), error );
269266
routingTable.forget( routerAddress );
270267
return null;
271268
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ private synchronized void freshClusterCompositionFetched( ClusterComposition com
190190

191191
private synchronized void clusterCompositionLookupFailed( Throwable error )
192192
{
193+
log.error( "Failed to update routing table. Current routing table: " + routingTable, error );
193194
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
194195
refreshRoutingTableFuture = null;
195196
routingTableFuture.completeExceptionally( error );

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 11 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,11 @@
1919
package org.neo4j.driver.internal.async.pool;
2020

2121
import io.netty.bootstrap.Bootstrap;
22-
import io.netty.channel.Channel;
23-
import io.netty.channel.pool.ChannelPool;
24-
import io.netty.util.concurrent.ImmediateEventExecutor;
2522
import org.junit.jupiter.api.AfterEach;
2623
import org.junit.jupiter.api.BeforeEach;
2724
import org.junit.jupiter.api.Test;
2825
import org.junit.jupiter.api.extension.RegisterExtension;
2926

30-
import java.util.HashMap;
31-
import java.util.HashSet;
32-
import java.util.Map;
33-
3427
import org.neo4j.driver.internal.BoltServerAddress;
3528
import org.neo4j.driver.internal.ConnectionSettings;
3629
import org.neo4j.driver.internal.async.BootstrapFactory;
@@ -43,32 +36,21 @@
4336
import org.neo4j.driver.v1.util.DatabaseExtension;
4437
import org.neo4j.driver.v1.util.ParallelizableIT;
4538

46-
import static java.util.Arrays.asList;
47-
import static java.util.Collections.singleton;
39+
import static org.hamcrest.Matchers.containsString;
40+
import static org.hamcrest.Matchers.instanceOf;
4841
import static org.hamcrest.Matchers.startsWith;
4942
import static org.hamcrest.junit.MatcherAssert.assertThat;
5043
import static org.junit.jupiter.api.Assertions.assertNotNull;
5144
import static org.junit.jupiter.api.Assertions.assertNull;
5245
import static org.junit.jupiter.api.Assertions.assertThrows;
5346
import static org.junit.jupiter.api.Assertions.assertTrue;
54-
import static org.mockito.Mockito.doReturn;
55-
import static org.mockito.Mockito.mock;
56-
import static org.mockito.Mockito.never;
57-
import static org.mockito.Mockito.verify;
58-
import static org.mockito.Mockito.verifyZeroInteractions;
59-
import static org.mockito.Mockito.when;
60-
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
6147
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
6248
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
6349
import static org.neo4j.driver.v1.util.TestUtil.await;
6450

6551
@ParallelizableIT
6652
class ConnectionPoolImplIT
6753
{
68-
private static final BoltServerAddress ADDRESS_1 = new BoltServerAddress( "server:1" );
69-
private static final BoltServerAddress ADDRESS_2 = new BoltServerAddress( "server:2" );
70-
private static final BoltServerAddress ADDRESS_3 = new BoltServerAddress( "server:3" );
71-
7254
@RegisterExtension
7355
static final DatabaseExtension neo4j = new DatabaseExtension();
7456

@@ -132,71 +114,16 @@ void shouldNotCloseWhenClosed()
132114
}
133115

134116
@Test
135-
void shouldDoNothingWhenRetainOnEmptyPool()
136-
{
137-
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
138-
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
139-
140-
pool.retainAll( singleton( LOCAL_DEFAULT ) );
141-
142-
verifyZeroInteractions( nettyChannelTracker );
143-
}
144-
145-
@Test
146-
void shouldRetainSpecifiedAddresses()
147-
{
148-
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
149-
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
150-
151-
pool.acquire( ADDRESS_1 );
152-
pool.acquire( ADDRESS_2 );
153-
pool.acquire( ADDRESS_3 );
154-
155-
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) );
156-
for ( ChannelPool channelPool : pool.channelPoolsByAddress.values() )
157-
{
158-
verify( channelPool, never() ).close();
159-
}
160-
}
161-
162-
@Test
163-
void shouldClosePoolsWhenRetaining()
164-
{
165-
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
166-
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
167-
168-
pool.acquire( ADDRESS_1 );
169-
pool.acquire( ADDRESS_2 );
170-
pool.acquire( ADDRESS_3 );
171-
172-
when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 2 );
173-
when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 0 );
174-
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 );
175-
176-
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) );
177-
verify( pool.getPool( ADDRESS_1 ), never() ).close();
178-
verify( pool.getPool( ADDRESS_2 ) ).close();
179-
verify( pool.getPool( ADDRESS_3 ), never() ).close();
180-
}
181-
182-
@Test
183-
void shouldNotClosePoolsWithActiveConnectionsWhenRetaining()
117+
void shouldFailToAcquireConnectionWhenPoolIsClosed()
184118
{
185-
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
186-
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
187-
188-
pool.acquire( ADDRESS_1 );
189-
pool.acquire( ADDRESS_2 );
190-
pool.acquire( ADDRESS_3 );
191-
192-
when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 1 );
193-
when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 42 );
194-
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 );
195-
196-
pool.retainAll( singleton( ADDRESS_2 ) );
197-
verify( pool.getPool( ADDRESS_1 ), never() ).close();
198-
verify( pool.getPool( ADDRESS_2 ), never() ).close();
199-
verify( pool.getPool( ADDRESS_3 ) ).close();
119+
await( pool.acquire( neo4j.address() ) );
120+
ExtendedChannelPool channelPool = this.pool.getPool( neo4j.address() );
121+
channelPool.close();
122+
ServiceUnavailableException error =
123+
assertThrows( ServiceUnavailableException.class, () -> await( pool.acquire( neo4j.address() ) ) );
124+
assertThat( error.getMessage(), containsString( "closed while acquiring a connection" ) );
125+
assertThat( error.getCause(), instanceOf( IllegalStateException.class ) );
126+
assertThat( error.getCause().getMessage(), containsString( "FixedChannelPooled was closed" ) );
200127
}
201128

202129
private ConnectionPoolImpl newPool() throws Exception
@@ -209,37 +136,8 @@ private ConnectionPoolImpl newPool() throws Exception
209136
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
210137
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
211138
}
212-
213139
private static PoolSettings newSettings()
214140
{
215141
return new PoolSettings( 10, 5000, -1, -1 );
216142
}
217-
218-
private static class TestConnectionPool extends ConnectionPoolImpl
219-
{
220-
final Map<BoltServerAddress,ChannelPool> channelPoolsByAddress = new HashMap<>();
221-
222-
TestConnectionPool( NettyChannelTracker nettyChannelTracker )
223-
{
224-
super( mock( ChannelConnector.class ), mock( Bootstrap.class ), nettyChannelTracker, newSettings(),
225-
DEV_NULL_METRICS, DEV_NULL_LOGGING, new FakeClock() );
226-
}
227-
228-
ChannelPool getPool( BoltServerAddress address )
229-
{
230-
ChannelPool pool = channelPoolsByAddress.get( address );
231-
assertNotNull( pool );
232-
return pool;
233-
}
234-
235-
@Override
236-
ChannelPool newPool( BoltServerAddress address )
237-
{
238-
ChannelPool channelPool = mock( ChannelPool.class );
239-
Channel channel = mock( Channel.class );
240-
doReturn( ImmediateEventExecutor.INSTANCE.newSucceededFuture( channel ) ).when( channelPool ).acquire();
241-
channelPoolsByAddress.put( address, channelPool );
242-
return channelPool;
243-
}
244-
}
245143
}

0 commit comments

Comments
 (0)