Skip to content

Commit c6251ea

Browse files
authored
Merge pull request #415 from zhenlineo/1.5-sync-over-async
Blocking API over async API
2 parents c551f04 + ae29b49 commit c6251ea

File tree

195 files changed

+2416
-14228
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

195 files changed

+2416
-14228
lines changed

driver/src/main/java/org/neo4j/driver/internal/Bookmark.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.Iterator;
2424
import java.util.Map;
25+
import java.util.Objects;
2526

2627
import org.neo4j.driver.v1.Value;
2728

@@ -98,6 +99,28 @@ public Map<String,Value> asBeginTransactionParameters()
9899
return parameters;
99100
}
100101

102+
@Override
103+
public boolean equals( Object o )
104+
{
105+
if ( this == o )
106+
{
107+
return true;
108+
}
109+
if ( o == null || getClass() != o.getClass() )
110+
{
111+
return false;
112+
}
113+
Bookmark bookmark = (Bookmark) o;
114+
return Objects.equals( values, bookmark.values ) &&
115+
Objects.equals( maxValue, bookmark.maxValue );
116+
}
117+
118+
@Override
119+
public int hashCode()
120+
{
121+
return Objects.hash( values, maxValue );
122+
}
123+
101124
@Override
102125
public String toString()
103126
{

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,71 +20,49 @@
2020

2121
import java.util.concurrent.CompletionStage;
2222

23-
import org.neo4j.driver.internal.async.AsyncConnection;
24-
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
25-
import org.neo4j.driver.internal.net.BoltServerAddress;
23+
import org.neo4j.driver.internal.async.BoltServerAddress;
24+
import org.neo4j.driver.internal.spi.Connection;
2625
import org.neo4j.driver.internal.spi.ConnectionPool;
2726
import org.neo4j.driver.internal.spi.ConnectionProvider;
28-
import org.neo4j.driver.internal.spi.PooledConnection;
2927
import org.neo4j.driver.v1.AccessMode;
3028

29+
import static org.neo4j.driver.v1.AccessMode.READ;
30+
3131
/**
3232
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
3333
* the given address.
3434
*/
3535
public class DirectConnectionProvider implements ConnectionProvider
3636
{
3737
private final BoltServerAddress address;
38-
private final ConnectionPool pool;
39-
private final AsyncConnectionPool asyncPool;
38+
private final ConnectionPool connectionPool;
4039

41-
DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool, AsyncConnectionPool asyncPool )
40+
DirectConnectionProvider( BoltServerAddress address, ConnectionPool connectionPool )
4241
{
4342
this.address = address;
44-
this.pool = pool;
45-
this.asyncPool = asyncPool;
46-
47-
verifyConnectivity();
43+
this.connectionPool = connectionPool;
4844
}
4945

5046
@Override
51-
public PooledConnection acquireConnection( AccessMode mode )
47+
public CompletionStage<Connection> acquireConnection( AccessMode mode )
5248
{
53-
return pool.acquire( address );
49+
return connectionPool.acquire( address );
5450
}
5551

5652
@Override
57-
public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode )
53+
public CompletionStage<Void> verifyConnectivity()
5854
{
59-
return asyncPool.acquire( address );
55+
return acquireConnection( READ ).thenCompose( Connection::forceRelease );
6056
}
6157

6258
@Override
6359
public CompletionStage<Void> close()
6460
{
65-
// todo: remove this try-catch when blocking API works on top of async
66-
try
67-
{
68-
pool.close();
69-
}
70-
catch ( Exception e )
71-
{
72-
throw new RuntimeException( e );
73-
}
74-
return asyncPool.close();
61+
return connectionPool.close();
7562
}
7663

7764
public BoltServerAddress getAddress()
7865
{
7966
return address;
8067
}
81-
82-
/**
83-
* Acquires and releases a connection to verify connectivity so this connection provider fails fast. This is
84-
* especially valuable when driver was created with incorrect credentials.
85-
*/
86-
private void verifyConnectivity()
87-
{
88-
acquireConnection( AccessMode.READ ).close();
89-
}
9068
}

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 41 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,26 @@
2525
import java.net.URI;
2626
import java.security.GeneralSecurityException;
2727

28-
import org.neo4j.driver.internal.async.AsyncConnectorImpl;
28+
import org.neo4j.driver.internal.async.BoltServerAddress;
2929
import org.neo4j.driver.internal.async.BootstrapFactory;
30-
import org.neo4j.driver.internal.async.Futures;
31-
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
32-
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
30+
import org.neo4j.driver.internal.async.ChannelConnector;
31+
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
32+
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
33+
import org.neo4j.driver.internal.async.pool.PoolSettings;
3334
import org.neo4j.driver.internal.cluster.RoutingContext;
3435
import org.neo4j.driver.internal.cluster.RoutingSettings;
3536
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
3637
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
3738
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
3839
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
39-
import org.neo4j.driver.internal.net.BoltServerAddress;
40-
import org.neo4j.driver.internal.net.SocketConnector;
41-
import org.neo4j.driver.internal.net.pooling.PoolSettings;
42-
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
4340
import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic;
4441
import org.neo4j.driver.internal.retry.RetryLogic;
4542
import org.neo4j.driver.internal.retry.RetrySettings;
4643
import org.neo4j.driver.internal.security.SecurityPlan;
4744
import org.neo4j.driver.internal.spi.ConnectionPool;
4845
import org.neo4j.driver.internal.spi.ConnectionProvider;
49-
import org.neo4j.driver.internal.spi.Connector;
5046
import org.neo4j.driver.internal.util.Clock;
47+
import org.neo4j.driver.internal.util.Futures;
5148
import org.neo4j.driver.v1.AuthToken;
5249
import org.neo4j.driver.v1.AuthTokens;
5350
import org.neo4j.driver.v1.Config;
@@ -72,27 +69,26 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7269
BoltServerAddress address = new BoltServerAddress( uri );
7370
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
7471
SecurityPlan securityPlan = createSecurityPlan( address, config );
75-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
7672

7773
Bootstrap bootstrap = createBootstrap();
7874
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
7975
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
8076

81-
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
82-
config );
77+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config );
8378

8479
try
8580
{
86-
return createDriver( uri, address, connectionPool, asyncConnectionPool, config, newRoutingSettings,
81+
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
8782
eventExecutorGroup, securityPlan, retryLogic );
83+
Futures.getBlocking( driver.verifyConnectivity() );
84+
return driver;
8885
}
8986
catch ( Throwable driverError )
9087
{
9188
// we need to close the connection pool if driver creation threw exception
9289
try
9390
{
94-
connectionPool.close();
95-
Futures.getBlocking( asyncConnectionPool.close() );
91+
Futures.getBlocking( connectionPool.close() );
9692
}
9793
catch ( Throwable closeError )
9894
{
@@ -102,32 +98,38 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
10298
}
10399
}
104100

105-
private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
101+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
106102
Bootstrap bootstrap, Config config )
107103
{
108104
Clock clock = createClock();
109105
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
110-
AsyncConnectorImpl connector = new AsyncConnectorImpl( settings, securityPlan, config.logging(), clock );
106+
ChannelConnector connector = createConnector( settings, securityPlan, config, clock );
111107
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
112108
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
113109
config.maxConnectionPoolSize(),
114110
config.connectionAcquisitionTimeoutMillis() );
115-
return new AsyncConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
111+
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
116112
}
117113

118-
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
119-
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
114+
protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan,
115+
Config config, Clock clock )
116+
{
117+
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock );
118+
}
119+
120+
private InternalDriver createDriver( URI uri, BoltServerAddress address,
121+
ConnectionPool connectionPool, Config config, RoutingSettings routingSettings,
120122
EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
121123
{
122124
String scheme = uri.getScheme().toLowerCase();
123125
switch ( scheme )
124126
{
125127
case BOLT_URI_SCHEME:
126128
assertNoRoutingContext( uri, routingSettings );
127-
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
129+
return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
128130
case BOLT_ROUTING_URI_SCHEME:
129-
return createRoutingDriver( address, connectionPool, asyncConnectionPool, config, routingSettings,
130-
securityPlan, retryLogic, eventExecutorGroup );
131+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
132+
eventExecutorGroup );
131133
default:
132134
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
133135
}
@@ -138,11 +140,11 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
138140
* <p>
139141
* <b>This method is protected only for testing</b>
140142
*/
141-
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
142-
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
143+
protected InternalDriver createDirectDriver( BoltServerAddress address, Config config,
144+
SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool )
143145
{
144146
ConnectionProvider connectionProvider =
145-
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
147+
new DirectConnectionProvider( address, connectionPool );
146148
SessionFactory sessionFactory =
147149
createSessionFactory( connectionProvider, retryLogic, config );
148150
return createDriver( config, securityPlan, sessionFactory );
@@ -153,16 +155,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
153155
* <p>
154156
* <b>This method is protected only for testing</b>
155157
*/
156-
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
157-
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
158-
SecurityPlan securityPlan, RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup )
158+
protected InternalDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
159+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
160+
EventExecutorGroup eventExecutorGroup )
159161
{
160162
if ( !securityPlan.isRoutingCompatible() )
161163
{
162164
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
163165
}
164-
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, asyncConnectionPool,
165-
eventExecutorGroup, config, routingSettings );
166+
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup,
167+
config, routingSettings );
166168
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
167169
return createDriver( config, securityPlan, sessionFactory );
168170
}
@@ -174,7 +176,7 @@ protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool
174176
*/
175177
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
176178
{
177-
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
179+
return new InternalDriver( securityPlan, sessionFactory );
178180
}
179181

180182
/**
@@ -183,45 +185,27 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
183185
* <b>This method is protected only for testing</b>
184186
*/
185187
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
186-
AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup,
187-
Config config, RoutingSettings routingSettings )
188+
EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
188189
{
189-
LoadBalancingStrategy loadBalancingStrategy =
190-
createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool );
191-
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, eventExecutorGroup,
192-
createClock(), config.logging(), loadBalancingStrategy );
190+
LoadBalancingStrategy loadBalancingStrategy = createLoadBalancingStrategy( config, connectionPool );
191+
return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
192+
config.logging(), loadBalancingStrategy );
193193
}
194194

195-
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool,
196-
AsyncConnectionPool asyncConnectionPool )
195+
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config,
196+
ConnectionPool connectionPool )
197197
{
198198
switch ( config.loadBalancingStrategy() )
199199
{
200200
case ROUND_ROBIN:
201201
return new RoundRobinLoadBalancingStrategy( config.logging() );
202202
case LEAST_CONNECTED:
203-
return new LeastConnectedLoadBalancingStrategy( connectionPool, asyncConnectionPool, config.logging() );
203+
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
204204
default:
205205
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
206206
}
207207
}
208208

209-
/**
210-
* Creates new {@link ConnectionPool}.
211-
* <p>
212-
* <b>This method is protected only for testing</b>
213-
*/
214-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
215-
{
216-
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
217-
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
218-
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
219-
config.maxConnectionPoolSize(), config.connectionAcquisitionTimeoutMillis() );
220-
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );
221-
222-
return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
223-
}
224-
225209
/**
226210
* Creates new {@link Clock}.
227211
* <p>
@@ -232,17 +216,6 @@ protected Clock createClock()
232216
return Clock.SYSTEM;
233217
}
234218

235-
/**
236-
* Creates new {@link Connector}.
237-
* <p>
238-
* <b>This method is protected only for testing</b>
239-
*/
240-
protected Connector createConnector( final ConnectionSettings connectionSettings, SecurityPlan securityPlan,
241-
Logging logging )
242-
{
243-
return new SocketConnector( connectionSettings, securityPlan, logging );
244-
}
245-
246219
/**
247220
* Creates new {@link SessionFactory}.
248221
* <p>

0 commit comments

Comments
 (0)