Skip to content

Commit 1691d6e

Browse files
authored
Merge pull request #596 from PetrJanouch/4.0_lightweight-driver
Providing possibility to share event loops among driver instances
2 parents 8dad6c4 + ba17bfe commit 1691d6e

File tree

9 files changed

+172
-25
lines changed

9 files changed

+172
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.EventLoopGroup;
2223
import io.netty.util.concurrent.EventExecutorGroup;
2324
import io.netty.util.internal.logging.InternalLoggerFactory;
2425

@@ -71,19 +72,37 @@ public class DriverFactory
7172
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings,
7273
RetrySettings retrySettings, Config config )
7374
{
75+
return newInstance( uri, authToken, routingSettings, retrySettings, config, null );
76+
}
77+
78+
public final Driver newInstance ( URI uri, AuthToken authToken, RoutingSettings routingSettings,
79+
RetrySettings retrySettings, Config config, EventLoopGroup eventLoopGroup )
80+
{
81+
Bootstrap bootstrap;
82+
boolean ownsEventLoopGroup;
83+
if ( eventLoopGroup == null )
84+
{
85+
bootstrap = createBootstrap();
86+
ownsEventLoopGroup = true;
87+
}
88+
else
89+
{
90+
bootstrap = createBootstrap( eventLoopGroup );
91+
ownsEventLoopGroup = false;
92+
}
93+
7494
authToken = authToken == null ? AuthTokens.none() : authToken;
7595

7696
BoltServerAddress address = new BoltServerAddress( uri );
7797
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
7898
SecurityPlan securityPlan = createSecurityPlan( address, config );
7999

80100
InternalLoggerFactory.setDefaultFactory( new NettyLogging( config.logging() ) );
81-
Bootstrap bootstrap = createBootstrap();
82101
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
83102
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
84103

85104
MetricsProvider metricsProvider = createDriverMetrics( config, createClock() );
86-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config );
105+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup );
87106

88107
InternalDriver driver = createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metricsProvider, config );
89108

@@ -93,7 +112,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
93112
}
94113

95114
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
96-
MetricsProvider metricsProvider, Config config )
115+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
97116
{
98117
Clock clock = createClock();
99118
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
@@ -102,7 +121,7 @@ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan
102121
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
103122
config.idleTimeBeforeConnectionTest()
104123
);
105-
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metricsProvider.metricsListener(), config.logging(), clock );
124+
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metricsProvider.metricsListener(), config.logging(), clock, ownsEventLoopGroup );
106125
}
107126

108127
protected static MetricsProvider createDriverMetrics( Config config, Clock clock )
@@ -271,6 +290,16 @@ protected Bootstrap createBootstrap()
271290
return BootstrapFactory.newBootstrap();
272291
}
273292

293+
/**
294+
* Creates new {@link Bootstrap}.
295+
* <p>
296+
* <b>This method is protected only for testing</b>
297+
*/
298+
protected Bootstrap createBootstrap( EventLoopGroup eventLoopGroup )
299+
{
300+
return BootstrapFactory.newBootstrap( eventLoopGroup );
301+
}
302+
274303
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
275304
{
276305
try

driver/src/main/java/org/neo4j/driver/internal/async/connection/BootstrapFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public static Bootstrap newBootstrap( int threadCount )
3838
return newBootstrap( EventLoopGroupFactory.newEventLoopGroup( threadCount ) );
3939
}
4040

41-
private static Bootstrap newBootstrap( EventLoopGroup eventLoopGroup )
41+
public static Bootstrap newBootstrap( EventLoopGroup eventLoopGroup )
4242
{
4343
Bootstrap bootstrap = new Bootstrap();
4444
bootstrap.group( eventLoopGroup );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,18 @@ public class ConnectionPoolImpl implements ConnectionPool
5757
private final Clock clock;
5858
private final Logger log;
5959
private final MetricsListener metricsListener;
60+
private final boolean ownsEventLoopGroup;
6061

6162
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
6263
private final AtomicBoolean closed = new AtomicBoolean();
6364

64-
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
65+
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup )
6566
{
66-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock );
67+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock, ownsEventLoopGroup );
6768
}
6869

6970
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
70-
PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
71+
PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup )
7172
{
7273
this.connector = connector;
7374
this.bootstrap = bootstrap;
@@ -77,6 +78,7 @@ public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, Pool
7778
this.metricsListener = metricsListener;
7879
this.clock = clock;
7980
this.log = logging.getLog( ConnectionPool.class.getSimpleName() );
81+
this.ownsEventLoopGroup = ownsEventLoopGroup;
8082
}
8183

8284
@Override
@@ -166,12 +168,20 @@ public CompletionStage<Void> close()
166168
}
167169
finally
168170
{
169-
// This is an attempt to speed up the shut down procedure of the driver
170-
// Feel free return this back to shutdownGracefully() method with default values
171-
// if this proves troublesome!!!
172-
eventLoopGroup().shutdownGracefully(200, 15_000, TimeUnit.MILLISECONDS);
171+
172+
if (ownsEventLoopGroup) {
173+
// This is an attempt to speed up the shut down procedure of the driver
174+
// Feel free return this back to shutdownGracefully() method with default values
175+
// if this proves troublesome!!!
176+
eventLoopGroup().shutdownGracefully(200, 15_000, TimeUnit.MILLISECONDS);
177+
}
173178
}
174179
}
180+
if (!ownsEventLoopGroup)
181+
{
182+
return Futures.completedWithNull();
183+
}
184+
175185
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
176186
.thenApply( ignore -> null );
177187
}

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,15 +442,15 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory
442442

443443
@Override
444444
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
445-
MetricsProvider ignored, Config config )
445+
MetricsProvider ignored, Config config, boolean ownsEventLoopGroup )
446446
{
447447
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 );
448448
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
449449
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
450450
config.idleTimeBeforeConnectionTest() );
451451
Clock clock = createClock();
452452
ChannelConnector connector = super.createConnector( connectionSettings, securityPlan, config, clock );
453-
connectionPool = new MemorizingConnectionPool( connector, bootstrap, poolSettings, config.logging(), clock );
453+
connectionPool = new MemorizingConnectionPool( connector, bootstrap, poolSettings, config.logging(), clock, ownsEventLoopGroup );
454454
return connectionPool;
455455
}
456456
}
@@ -461,9 +461,9 @@ private static class MemorizingConnectionPool extends ConnectionPoolImpl
461461
boolean memorize;
462462

463463
MemorizingConnectionPool( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
464-
Logging logging, Clock clock )
464+
Logging logging, Clock clock, boolean ownsEventLoopGroup )
465465
{
466-
super( connector, bootstrap, settings, DEV_NULL_METRICS, logging, clock );
466+
super( connector, bootstrap, settings, DEV_NULL_METRICS, logging, clock, ownsEventLoopGroup );
467467
}
468468

469469
void startMemorizing()
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.integration;
20+
21+
import io.netty.channel.EventLoopGroup;
22+
import io.netty.channel.nio.NioEventLoopGroup;
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.api.extension.RegisterExtension;
25+
26+
import java.util.concurrent.TimeUnit;
27+
28+
import org.neo4j.driver.Config;
29+
import org.neo4j.driver.Driver;
30+
import org.neo4j.driver.Session;
31+
import org.neo4j.driver.internal.DriverFactory;
32+
import org.neo4j.driver.internal.cluster.RoutingSettings;
33+
import org.neo4j.driver.internal.retry.RetrySettings;
34+
import org.neo4j.driver.util.DatabaseExtension;
35+
import org.neo4j.driver.util.ParallelizableIT;
36+
37+
import static org.junit.jupiter.api.Assertions.fail;
38+
39+
@ParallelizableIT
40+
class SharedEventLoopIT
41+
{
42+
private final DriverFactory driverFactory = new DriverFactory();
43+
44+
@RegisterExtension
45+
static final DatabaseExtension neo4j = new DatabaseExtension();
46+
47+
@Test
48+
void testDriverShouldNotCloseSharedEventLoop()
49+
{
50+
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup( 1 );
51+
52+
try
53+
{
54+
Driver driver1 = createDriver( eventLoopGroup );
55+
Driver driver2 = createDriver( eventLoopGroup );
56+
57+
testConnection( driver1 );
58+
testConnection( driver2 );
59+
60+
driver1.close();
61+
62+
testConnection( driver2 );
63+
driver2.close();
64+
}
65+
finally
66+
{
67+
eventLoopGroup.shutdownGracefully( 100, 100, TimeUnit.MILLISECONDS );
68+
}
69+
}
70+
71+
@Test
72+
void testDriverShouldUseSharedEventLoop()
73+
{
74+
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup( 1 );
75+
76+
Driver driver = createDriver( eventLoopGroup );
77+
testConnection( driver );
78+
79+
eventLoopGroup.shutdownGracefully( 100, 100, TimeUnit.MILLISECONDS );
80+
81+
// the driver should fail if it really uses the provided event loop
82+
// if the call succeeds, it meas that the driver created its own event loop
83+
try
84+
{
85+
testConnection( driver );
86+
fail( "Exception expected" );
87+
}
88+
catch ( Exception e )
89+
{
90+
// ignored
91+
}
92+
}
93+
94+
private Driver createDriver( EventLoopGroup eventLoopGroup )
95+
{
96+
RoutingSettings routingSettings = new RoutingSettings( 1000, 1000 );
97+
RetrySettings retrySettings = new RetrySettings( 1000 );
98+
return driverFactory.newInstance( neo4j.uri(), neo4j.authToken(), routingSettings, retrySettings, Config.build().build(), eventLoopGroup );
99+
}
100+
101+
private void testConnection( Driver driver )
102+
{
103+
try ( Session session = driver.session() )
104+
{
105+
session.run( "RETURN 1" );
106+
}
107+
}
108+
}

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltSer
223223

224224
@Override
225225
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
226-
MetricsProvider metricsProvider, Config config )
226+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
227227
{
228228
return connectionPool;
229229
}
@@ -259,7 +259,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
259259

260260
@Override
261261
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
262-
MetricsProvider metricsProvider, Config config )
262+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
263263
{
264264
return connectionPoolMock();
265265
}
@@ -282,7 +282,7 @@ protected Bootstrap createBootstrap()
282282

283283
@Override
284284
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
285-
MetricsProvider metricsProvider, Config config )
285+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
286286
{
287287
return connectionPoolMock();
288288
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ private ConnectionPoolImpl newPool() throws Exception
207207
DEV_NULL_LOGGING, clock );
208208
PoolSettings poolSettings = newSettings();
209209
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
210-
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
210+
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock, true );
211211
}
212212

213213
private static PoolSettings newSettings()
@@ -222,7 +222,7 @@ private static class TestConnectionPool extends ConnectionPoolImpl
222222
TestConnectionPool( NettyChannelTracker nettyChannelTracker )
223223
{
224224
super( mock( ChannelConnector.class ), mock( Bootstrap.class ), nettyChannelTracker, newSettings(),
225-
DEV_NULL_METRICS, DEV_NULL_LOGGING, new FakeClock() );
225+
DEV_NULL_METRICS, DEV_NULL_LOGGING, new FakeClock(), true );
226226
}
227227

228228
ChannelPool getPool( BoltServerAddress address )

driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public class FailingConnectionDriverFactory extends DriverFactory
4242

4343
@Override
4444
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
45-
MetricsProvider metricsProvider, Config config )
45+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
4646
{
47-
ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config );
47+
ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup );
4848
return new ConnectionPoolWithFailingConnections( pool, nextRunFailure );
4949
}
5050

driver/src/test/java/org/neo4j/driver/internal/util/io/ChannelTrackingDriverFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ protected final ChannelConnector createConnector( ConnectionSettings settings, S
7474

7575
@Override
7676
protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
77-
MetricsProvider metricsProvider, Config config )
77+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
7878
{
79-
pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config );
79+
pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup );
8080
return pool;
8181
}
8282

0 commit comments

Comments
 (0)