Skip to content

Commit 9f27b11

Browse files
authored
Merge pull request #433 from lutovich/1.5-acquisition-timeout-err
Improve connection acquisition timeout error
2 parents bf86eaa + f68606b commit 9f27b11

File tree

8 files changed

+213
-17
lines changed

8 files changed

+213
-17
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import io.netty.channel.pool.ChannelPool;
2525
import io.netty.util.concurrent.Future;
2626

27+
import java.util.concurrent.CompletionException;
2728
import java.util.concurrent.CompletionStage;
2829
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.ConcurrentMap;
31+
import java.util.concurrent.TimeoutException;
3032
import java.util.concurrent.atomic.AtomicBoolean;
3133

3234
import org.neo4j.driver.internal.async.BoltServerAddress;
@@ -38,6 +40,7 @@
3840
import org.neo4j.driver.internal.util.Futures;
3941
import org.neo4j.driver.v1.Logger;
4042
import org.neo4j.driver.v1.Logging;
43+
import org.neo4j.driver.v1.exceptions.ClientException;
4144

4245
public class ConnectionPoolImpl implements ConnectionPool
4346
{
@@ -73,8 +76,9 @@ public CompletionStage<Connection> acquire( final BoltServerAddress address )
7376
ChannelPool pool = getOrCreatePool( address );
7477
Future<Channel> connectionFuture = pool.acquire();
7578

76-
return Futures.asCompletionStage( connectionFuture ).thenApply( channel ->
79+
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
7780
{
81+
processAcquisitionError( error );
7882
assertNotClosed( address, channel, pool );
7983
return new NettyConnection( channel, pool, clock );
8084
} );
@@ -160,6 +164,27 @@ private EventLoopGroup eventLoopGroup()
160164
return bootstrap.config().group();
161165
}
162166

167+
private void processAcquisitionError( Throwable error )
168+
{
169+
Throwable cause = Futures.completionErrorCause( error );
170+
if ( cause != null )
171+
{
172+
if ( cause instanceof TimeoutException )
173+
{
174+
// NettyChannelPool returns future failed with TimeoutException if acquire operation takes more than
175+
// configured time, translate this exception to a prettier one and re-throw
176+
throw new ClientException(
177+
"Unable to acquire connection from the pool within configured maximum time of " +
178+
settings.connectionAcquisitionTimeout() + "ms" );
179+
}
180+
else
181+
{
182+
// some unknown error happened during connection acquisition, propagate it
183+
throw new CompletionException( cause );
184+
}
185+
}
186+
}
187+
163188
private void assertNotClosed()
164189
{
165190
if ( closed.get() )

driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import org.neo4j.driver.internal.InternalDriver;
3030
import org.neo4j.driver.internal.SessionFactory;
3131
import org.neo4j.driver.internal.SessionFactoryImpl;
32+
import org.neo4j.driver.internal.async.BoltServerAddress;
3233
import org.neo4j.driver.internal.cluster.AddressSet;
3334
import org.neo4j.driver.internal.cluster.RoutingTable;
3435
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
35-
import org.neo4j.driver.internal.async.BoltServerAddress;
3636
import org.neo4j.driver.internal.spi.ConnectionProvider;
3737
import org.neo4j.driver.v1.Driver;
3838
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -236,6 +236,30 @@ public void describeTo( Description description )
236236
};
237237
}
238238

239+
public static Matcher<Throwable> connectionAcquisitionTimeoutError( int timeoutMillis )
240+
{
241+
return new TypeSafeMatcher<Throwable>()
242+
{
243+
@Override
244+
protected boolean matchesSafely( Throwable error )
245+
{
246+
if ( error instanceof ClientException )
247+
{
248+
String expectedMessage = "Unable to acquire connection from the pool within " +
249+
"configured maximum time of " + timeoutMillis + "ms";
250+
return expectedMessage.equals( error.getMessage() );
251+
}
252+
return false;
253+
}
254+
255+
@Override
256+
public void describeTo( Description description )
257+
{
258+
description.appendText( "acquisition timeout error with " + timeoutMillis + "ms" );
259+
}
260+
};
261+
}
262+
239263
private static boolean contains( AddressSet set, BoltServerAddress address )
240264
{
241265
BoltServerAddress[] addresses = set.toArray();

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.v1.integration;
2020

21+
import org.junit.AfterClass;
2122
import org.junit.Rule;
2223
import org.junit.Test;
2324

@@ -35,7 +36,6 @@
3536
import java.util.concurrent.TimeoutException;
3637

3738
import org.neo4j.driver.internal.cluster.RoutingSettings;
38-
import org.neo4j.driver.internal.logging.DevNullLogger;
3939
import org.neo4j.driver.internal.retry.RetrySettings;
4040
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
4141
import org.neo4j.driver.internal.util.FakeClock;
@@ -44,8 +44,6 @@
4444
import org.neo4j.driver.v1.Config;
4545
import org.neo4j.driver.v1.Driver;
4646
import org.neo4j.driver.v1.GraphDatabase;
47-
import org.neo4j.driver.v1.Logger;
48-
import org.neo4j.driver.v1.Logging;
4947
import org.neo4j.driver.v1.Record;
5048
import org.neo4j.driver.v1.Session;
5149
import org.neo4j.driver.v1.StatementResult;
@@ -64,9 +62,11 @@
6462
import org.neo4j.driver.v1.util.cc.ClusterMemberRole;
6563
import org.neo4j.driver.v1.util.cc.ClusterRule;
6664

65+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
6766
import static java.util.concurrent.TimeUnit.SECONDS;
6867
import static org.hamcrest.Matchers.containsString;
6968
import static org.hamcrest.Matchers.instanceOf;
69+
import static org.hamcrest.Matchers.is;
7070
import static org.hamcrest.Matchers.startsWith;
7171
import static org.junit.Assert.assertEquals;
7272
import static org.junit.Assert.assertNotEquals;
@@ -76,6 +76,7 @@
7676
import static org.junit.Assert.fail;
7777
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
7878
import static org.neo4j.driver.internal.util.Futures.getBlocking;
79+
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;
7980
import static org.neo4j.driver.v1.Values.parameters;
8081

8182
public class CausalClusteringIT
@@ -85,6 +86,12 @@ public class CausalClusteringIT
8586
@Rule
8687
public final ClusterRule clusterRule = new ClusterRule();
8788

89+
@AfterClass
90+
public static void stopSharedCluster()
91+
{
92+
ClusterRule.stopSharedCluster();
93+
}
94+
8895
@Test
8996
public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception
9097
{
@@ -532,6 +539,46 @@ public void shouldNotReuseReadConnectionForWriteTransaction()
532539
}
533540
}
534541

542+
@Test
543+
public void shouldRespectMaxConnectionPoolSizePerClusterMember()
544+
{
545+
Cluster cluster = clusterRule.getCluster();
546+
ClusterMember leader = cluster.leader();
547+
548+
Config config = Config.build()
549+
.withMaxConnectionPoolSize( 2 )
550+
.withConnectionAcquisitionTimeout( 42, MILLISECONDS )
551+
.withLogging( DEV_NULL_LOGGING )
552+
.toConfig();
553+
554+
try ( Driver driver = createDriver( leader.getRoutingUri(), config ) )
555+
{
556+
Session writeSession1 = driver.session( AccessMode.WRITE );
557+
writeSession1.beginTransaction();
558+
559+
Session writeSession2 = driver.session( AccessMode.WRITE );
560+
writeSession2.beginTransaction();
561+
562+
// should not be possible to acquire more connections towards leader because limit is 2
563+
Session writeSession3 = driver.session( AccessMode.WRITE );
564+
try
565+
{
566+
writeSession3.beginTransaction();
567+
fail( "Exception expected" );
568+
}
569+
catch ( ClientException e )
570+
{
571+
assertThat( e, is( connectionAcquisitionTimeoutError( 42 ) ) );
572+
}
573+
574+
// should be possible to acquire new connection towards read server
575+
// it's a different machine, not leader, so different max connection pool size limit applies
576+
Session readSession = driver.session( AccessMode.READ );
577+
Record record = readSession.readTransaction( tx -> tx.run( "RETURN 1" ).single() );
578+
assertEquals( 1, record.get( 0 ).asInt() );
579+
}
580+
}
581+
535582
private CompletionStage<List<RecordAndSummary>> combineCursors( StatementResultCursor cursor1,
536583
StatementResultCursor cursor2 )
537584
{
@@ -702,19 +749,15 @@ else if ( role == ClusterMemberRole.READ_REPLICA )
702749

703750
private Driver createDriver( URI boltUri )
704751
{
705-
Logging devNullLogging = new Logging()
706-
{
707-
@Override
708-
public Logger getLog( String name )
709-
{
710-
return DevNullLogger.DEV_NULL_LOGGER;
711-
}
712-
};
713-
714752
Config config = Config.build()
715-
.withLogging( devNullLogging )
753+
.withLogging( DEV_NULL_LOGGING )
716754
.toConfig();
717755

756+
return createDriver( boltUri, config );
757+
}
758+
759+
private Driver createDriver( URI boltUri, Config config )
760+
{
718761
return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config );
719762
}
720763

driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionPoolIT.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,26 @@
3030

3131
import org.neo4j.driver.internal.cluster.RoutingSettings;
3232
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
33+
import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread;
3334
import org.neo4j.driver.internal.util.FakeClock;
3435
import org.neo4j.driver.v1.Config;
3536
import org.neo4j.driver.v1.Driver;
3637
import org.neo4j.driver.v1.GraphDatabase;
3738
import org.neo4j.driver.v1.Session;
3839
import org.neo4j.driver.v1.StatementResult;
3940
import org.neo4j.driver.v1.Transaction;
41+
import org.neo4j.driver.v1.exceptions.ClientException;
4042
import org.neo4j.driver.v1.util.TestNeo4j;
4143

4244
import static java.util.concurrent.TimeUnit.SECONDS;
43-
import static junit.framework.TestCase.fail;
45+
import static org.hamcrest.Matchers.is;
4446
import static org.junit.Assert.assertEquals;
4547
import static org.junit.Assert.assertFalse;
48+
import static org.junit.Assert.assertThat;
4649
import static org.junit.Assert.assertTrue;
50+
import static org.junit.Assert.fail;
4751
import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT;
52+
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;
4853

4954
public class ConnectionPoolIT
5055
{
@@ -113,6 +118,28 @@ public void shouldDisposeChannelsBasedOnMaxLifetime() throws Exception
113118
assertTrue( channel2.isActive() );
114119
}
115120

121+
@Test
122+
public void shouldRespectMaxConnectionPoolSize()
123+
{
124+
int maxPoolSize = 3;
125+
Config config = Config.build()
126+
.withMaxConnectionPoolSize( maxPoolSize )
127+
.withConnectionAcquisitionTimeout( 542, TimeUnit.MILLISECONDS )
128+
.toConfig();
129+
130+
driver = new DriverFactoryWithOneEventLoopThread().newInstance( neo4j.uri(), neo4j.authToken(), config );
131+
132+
try
133+
{
134+
startAndCloseTransactions( driver, maxPoolSize + 1 );
135+
fail( "Exception expected" );
136+
}
137+
catch ( ClientException e )
138+
{
139+
assertThat( e, is( connectionAcquisitionTimeoutError( 542 ) ) );
140+
}
141+
}
142+
116143
@After
117144
public void cleanup() throws Exception
118145
{

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.neo4j.driver.internal.logging.DevNullLogging;
4646
import org.neo4j.driver.internal.retry.RetrySettings;
4747
import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic;
48+
import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread;
4849
import org.neo4j.driver.internal.util.ServerVersion;
4950
import org.neo4j.driver.v1.AccessMode;
5051
import org.neo4j.driver.v1.AuthToken;
@@ -73,6 +74,7 @@
7374
import static org.hamcrest.Matchers.containsInAnyOrder;
7475
import static org.hamcrest.Matchers.emptyArray;
7576
import static org.hamcrest.Matchers.greaterThan;
77+
import static org.hamcrest.Matchers.is;
7678
import static org.hamcrest.Matchers.not;
7779
import static org.junit.Assert.assertEquals;
7880
import static org.junit.Assert.assertFalse;
@@ -87,6 +89,7 @@
8789
import static org.mockito.Mockito.spy;
8890
import static org.mockito.Mockito.times;
8991
import static org.mockito.Mockito.verify;
92+
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;
9093
import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0;
9194
import static org.neo4j.driver.v1.Values.parameters;
9295
import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon;
@@ -1296,6 +1299,38 @@ public void shouldConsumePreviousResultBeforeRunningNewQuery()
12961299
}
12971300
}
12981301

1302+
@Test
1303+
public void shouldNotRetryOnConnectionAcquisitionTimeout()
1304+
{
1305+
int maxPoolSize = 3;
1306+
Config config = Config.build()
1307+
.withMaxConnectionPoolSize( maxPoolSize )
1308+
.withConnectionAcquisitionTimeout( 0, TimeUnit.SECONDS )
1309+
.withMaxTransactionRetryTime( 42, TimeUnit.DAYS ) // retry for a really long time
1310+
.toConfig();
1311+
1312+
driver = new DriverFactoryWithOneEventLoopThread().newInstance( neo4j.uri(), neo4j.authToken(), config );
1313+
1314+
for ( int i = 0; i < maxPoolSize; i++ )
1315+
{
1316+
driver.session().beginTransaction();
1317+
}
1318+
1319+
AtomicInteger invocations = new AtomicInteger();
1320+
try
1321+
{
1322+
driver.session().writeTransaction( tx -> invocations.incrementAndGet() );
1323+
fail( "Exception expected" );
1324+
}
1325+
catch ( ClientException e )
1326+
{
1327+
assertThat( e, is( connectionAcquisitionTimeoutError( 0 ) ) );
1328+
}
1329+
1330+
// work should never be invoked
1331+
assertEquals( 0, invocations.get() );
1332+
}
1333+
12991334
private void assumeServerIs31OrLater()
13001335
{
13011336
ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );

driver/src/test/java/org/neo4j/driver/v1/stress/CausalClusteringStressIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.v1.stress;
2020

21+
import org.junit.AfterClass;
2122
import org.junit.Rule;
2223

2324
import java.net.URI;
@@ -40,6 +41,7 @@
4041
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
4142
import org.neo4j.driver.v1.summary.ResultSummary;
4243
import org.neo4j.driver.v1.util.cc.ClusterMemberRole;
44+
import org.neo4j.driver.v1.util.cc.ClusterRule;
4345
import org.neo4j.driver.v1.util.cc.LocalOrRemoteClusterRule;
4446

4547
import static org.hamcrest.Matchers.both;
@@ -54,6 +56,12 @@ public class CausalClusteringStressIT extends AbstractStressTestBase<CausalClust
5456
@Rule
5557
public final LocalOrRemoteClusterRule clusterRule = new LocalOrRemoteClusterRule();
5658

59+
@AfterClass
60+
public static void stopSharedCluster()
61+
{
62+
ClusterRule.stopSharedCluster();
63+
}
64+
5765
@Override
5866
URI databaseUri()
5967
{

driver/src/test/java/org/neo4j/driver/v1/util/Neo4jRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public static synchronized Neo4jRunner getOrCreateGlobalRunner() throws IOExcept
8080
return globalInstance;
8181
}
8282

83+
public static synchronized boolean globalRunnerExists()
84+
{
85+
return globalInstance != null;
86+
}
87+
8388
private Neo4jRunner() throws IOException
8489
{
8590
try

0 commit comments

Comments
 (0)