Skip to content

Commit 378ae43

Browse files
author
Zhen Li
authored
Merge pull request #324 from lutovich/1.2-separate-conn-from-session
Decouple session from connection
2 parents 9d6c2fd + d860477 commit 378ae43

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1861
-761
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java renamed to driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,39 @@
1919
package org.neo4j.driver.internal;
2020

2121
import org.neo4j.driver.internal.net.BoltServerAddress;
22-
import org.neo4j.driver.internal.security.SecurityPlan;
2322
import org.neo4j.driver.internal.spi.ConnectionPool;
23+
import org.neo4j.driver.internal.spi.ConnectionProvider;
24+
import org.neo4j.driver.internal.spi.PooledConnection;
2425
import org.neo4j.driver.v1.AccessMode;
25-
import org.neo4j.driver.v1.Logging;
26-
import org.neo4j.driver.v1.Session;
2726

28-
import static java.lang.String.format;
29-
30-
public class DirectDriver extends BaseDriver
27+
/**
28+
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
29+
* the given address.
30+
*/
31+
public class DirectConnectionProvider implements ConnectionProvider
3132
{
3233
private final BoltServerAddress address;
33-
protected final ConnectionPool connections;
34+
private final ConnectionPool pool;
3435

35-
public DirectDriver(
36-
BoltServerAddress address,
37-
ConnectionPool connections,
38-
SecurityPlan securityPlan,
39-
SessionFactory sessionFactory,
40-
Logging logging )
36+
DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool )
4137
{
42-
super( securityPlan, sessionFactory, logging );
4338
this.address = address;
44-
this.connections = connections;
39+
this.pool = pool;
4540
}
4641

4742
@Override
48-
protected Session newSessionWithMode( AccessMode mode )
43+
public PooledConnection acquireConnection( AccessMode mode )
4944
{
50-
return sessionFactory.newInstance( connections.acquire( address ) );
45+
return pool.acquire( address );
5146
}
5247

5348
@Override
54-
protected void closeResources()
49+
public void close() throws Exception
5550
{
56-
try
57-
{
58-
connections.close();
59-
}
60-
catch ( Exception ex )
61-
{
62-
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
63-
}
51+
pool.close();
6452
}
6553

66-
BoltServerAddress server()
54+
public BoltServerAddress getAddress()
6755
{
6856
return address;
6957
}

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import java.net.URI;
2323
import java.security.GeneralSecurityException;
2424

25+
import org.neo4j.driver.internal.cluster.LoadBalancer;
2526
import org.neo4j.driver.internal.cluster.RoutingSettings;
2627
import org.neo4j.driver.internal.net.BoltServerAddress;
2728
import org.neo4j.driver.internal.net.SocketConnector;
2829
import org.neo4j.driver.internal.net.pooling.PoolSettings;
2930
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
3031
import org.neo4j.driver.internal.security.SecurityPlan;
3132
import org.neo4j.driver.internal.spi.ConnectionPool;
33+
import org.neo4j.driver.internal.spi.ConnectionProvider;
3234
import org.neo4j.driver.internal.spi.Connector;
3335
import org.neo4j.driver.internal.util.Clock;
3436
import org.neo4j.driver.v1.AuthToken;
@@ -50,13 +52,10 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
5052
BoltServerAddress address = BoltServerAddress.from( uri );
5153
SecurityPlan securityPlan = createSecurityPlan( address, config );
5254
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
53-
SessionFactory sessionFactory = createSessionFactory( config );
5455

5556
try
5657
{
57-
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
58-
sessionFactory
59-
);
58+
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan );
6059
}
6160
catch ( Throwable driverError )
6261
{
@@ -74,42 +73,68 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7473
}
7574

7675
private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
77-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
78-
SessionFactory sessionFactory )
76+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
7977
{
8078
switch ( scheme.toLowerCase() )
8179
{
8280
case "bolt":
83-
return createDirectDriver( address, connectionPool, config, securityPlan, sessionFactory );
81+
return createDirectDriver( address, connectionPool, config, securityPlan );
8482
case "bolt+routing":
85-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan,
86-
sessionFactory );
83+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan );
8784
default:
8885
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
8986
}
9087
}
9188

9289
/**
93-
* Creates new {@link DirectDriver}.
90+
* Creates a new driver for "bolt" scheme.
9491
* <p>
9592
* <b>This method is protected only for testing</b>
9693
*/
97-
protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool,
98-
Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
94+
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
95+
SecurityPlan securityPlan )
9996
{
100-
return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() );
97+
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
98+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
99+
return createDriver( config, securityPlan, sessionFactory );
101100
}
102101

103102
/**
104-
* Creates new {@link RoutingDriver}.
103+
* Creates new a new driver for "bolt+routing" scheme.
105104
* <p>
106105
* <b>This method is protected only for testing</b>
107106
*/
108-
protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
109-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
107+
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
108+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
110109
{
111-
return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory,
112-
createClock(), config.logging() );
110+
if ( !securityPlan.isRoutingCompatible() )
111+
{
112+
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
113+
}
114+
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
115+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
116+
return createDriver( config, securityPlan, sessionFactory );
117+
}
118+
119+
/**
120+
* Creates new {@link Driver}.
121+
* <p>
122+
* <b>This method is protected only for testing</b>
123+
*/
124+
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
125+
{
126+
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
127+
}
128+
129+
/**
130+
* Creates new {@link LoadBalancer} for the routing driver.
131+
* <p>
132+
* <b>This method is protected only for testing</b>
133+
*/
134+
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
135+
RoutingSettings routingSettings )
136+
{
137+
return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address );
113138
}
114139

115140
/**
@@ -150,13 +175,14 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu
150175
return new SocketConnector( connectionSettings, securityPlan, logging );
151176
}
152177

153-
private static SessionFactory createSessionFactory( Config config )
178+
/**
179+
* Creates new {@link SessionFactory}.
180+
* <p>
181+
* <b>This method is protected only for testing</b>
182+
*/
183+
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config )
154184
{
155-
if ( config.logLeakedSessions() )
156-
{
157-
return new LeakLoggingNetworkSessionFactory( config.logging() );
158-
}
159-
return new NetworkSessionFactory();
185+
return new SessionFactoryImpl( connectionProvider, config, config.logging() );
160186
}
161187

162188
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,21 @@ private enum State
6565
ROLLED_BACK
6666
}
6767

68-
private final Runnable cleanup;
68+
private final SessionResourcesHandler resourcesHandler;
6969
private final Connection conn;
7070

7171
private String bookmark = null;
7272
private State state = State.ACTIVE;
7373

74-
public ExplicitTransaction( Connection conn, Runnable cleanup )
74+
public ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler )
7575
{
76-
this( conn, cleanup, null );
76+
this( conn, resourcesHandler, null );
7777
}
7878

79-
ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark )
79+
ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler, String bookmark )
8080
{
8181
this.conn = conn;
82-
this.cleanup = cleanup;
82+
this.resourcesHandler = resourcesHandler;
8383
runBeginStatement( conn, bookmark );
8484
}
8585

@@ -139,7 +139,7 @@ else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
139139
}
140140
finally
141141
{
142-
cleanup.run();
142+
resourcesHandler.onTransactionClosed( this );
143143
}
144144
}
145145

@@ -185,13 +185,14 @@ public synchronized StatementResult run( Statement statement )
185185

186186
try
187187
{
188-
InternalStatementResult cursor = new InternalStatementResult( conn, this, statement );
188+
InternalStatementResult result =
189+
new InternalStatementResult( conn, SessionResourcesHandler.NO_OP, this, statement );
189190
conn.run( statement.text(),
190191
statement.parameters().asMap( ofValue() ),
191-
cursor.runResponseCollector() );
192-
conn.pullAll( cursor.pullAllResponseCollector() );
192+
result.runResponseCollector() );
193+
conn.pullAll( result.pullAllResponseCollector() );
193194
conn.flush();
194-
return cursor;
195+
return result;
195196
}
196197
catch ( Neo4jException e )
197198
{

driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java renamed to driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,19 @@
2727
import org.neo4j.driver.v1.Logging;
2828
import org.neo4j.driver.v1.Session;
2929

30-
abstract class BaseDriver implements Driver
30+
import static java.lang.String.format;
31+
32+
public class InternalDriver implements Driver
3133
{
3234
private final static String DRIVER_LOG_NAME = "Driver";
3335

3436
private final SecurityPlan securityPlan;
35-
protected final SessionFactory sessionFactory;
36-
protected final Logger log;
37+
private final SessionFactory sessionFactory;
38+
private final Logger log;
3739

3840
private AtomicBoolean closed = new AtomicBoolean( false );
3941

40-
BaseDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
42+
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
4143
{
4244
this.securityPlan = securityPlan;
4345
this.sessionFactory = sessionFactory;
@@ -61,8 +63,8 @@ public final Session session()
6163
public final Session session( AccessMode mode )
6264
{
6365
assertOpen();
64-
Session session = newSessionWithMode( mode );
65-
if( closed.get() )
66+
Session session = sessionFactory.newInstance( mode );
67+
if ( closed.get() )
6668
{
6769
// the driver is already closed and we either 1. obtain this session from the old session pool
6870
// or 2. we obtain this session from a new session pool
@@ -77,15 +79,35 @@ public final Session session( AccessMode mode )
7779
@Override
7880
public final void close()
7981
{
80-
if ( closed.compareAndSet(false, true) )
82+
if ( closed.compareAndSet( false, true ) )
8183
{
8284
closeResources();
8385
}
8486
}
8587

86-
protected abstract Session newSessionWithMode( AccessMode mode );
88+
/**
89+
* Get the underlying session factory.
90+
* <p>
91+
* <b>This method is only for testing</b>
92+
*
93+
* @return the session factory used by this driver.
94+
*/
95+
public final SessionFactory getSessionFactory()
96+
{
97+
return sessionFactory;
98+
}
8799

88-
protected abstract void closeResources();
100+
private void closeResources()
101+
{
102+
try
103+
{
104+
sessionFactory.close();
105+
}
106+
catch ( Exception ex )
107+
{
108+
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
109+
}
110+
}
89111

90112
private void assertOpen()
91113
{
@@ -95,7 +117,7 @@ private void assertOpen()
95117
}
96118
}
97119

98-
private IllegalStateException driverCloseException()
120+
private static RuntimeException driverCloseException()
99121
{
100122
return new IllegalStateException( "This driver instance has already been closed" );
101123
}

0 commit comments

Comments
 (0)