Skip to content

Draft of async API #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b867423
Simplified collectors
lutovich Aug 28, 2017
932f4e2
Initial attempt to use Netty in connection layer
lutovich Aug 24, 2017
7c69ace
Simple async connection with RUN, PULL_ALL and flush
lutovich Aug 25, 2017
88c79ea
Measure average query time, print less to sout
lutovich Aug 25, 2017
3e6c981
SSL support in netty-based connection
lutovich Aug 28, 2017
5930d93
Added rudimentary `Session#runAsync()`
lutovich Aug 28, 2017
1783b57
Initial work on netty channel pooling
lutovich Aug 29, 2017
e06f4e1
Fixed tracking of active connection counts
lutovich Aug 29, 2017
0848bae
Small cleanup of channel future listeners
lutovich Aug 30, 2017
515ace6
Send RESET before releasing channel to the pool
lutovich Aug 30, 2017
2fbb56d
Added async methods to StatementRunner
lutovich Sep 1, 2017
38382eb
Fix synchronization when handling records
lutovich Sep 3, 2017
6b57d2f
Respond to FAILURE with ACK_FAILURE
lutovich Sep 4, 2017
2867d4a
Initial support of async transactions
lutovich Sep 4, 2017
c48c68a
Further improve async transactions support
lutovich Sep 5, 2017
74a7c90
Improved ListenableFuture
lutovich Sep 6, 2017
f1d872f
Attempt to use futures bottom-up
lutovich Sep 6, 2017
26f5191
Small cleanup of InternalStatementResultCursor
lutovich Sep 7, 2017
9aeaca2
Async transactions support based on futures
lutovich Sep 7, 2017
7c34fad
Removed couple fixed TODOs
lutovich Sep 8, 2017
316ca88
Renamed internal package
lutovich Sep 8, 2017
142a92e
Clean up handling of outbound messages
lutovich Sep 8, 2017
2d1f0ec
Further simplify outbound message handling
lutovich Sep 8, 2017
00d2a25
Clean up handling of inbound messages
lutovich Sep 10, 2017
87830b6
Removed unused classes
lutovich Sep 10, 2017
daceaef
Server version check for bytes, pool improvements
lutovich Sep 11, 2017
10575ad
More unit tests and renaming
lutovich Sep 12, 2017
0f60d8e
Disallow commit a rolled back tx and vice versa
lutovich Sep 12, 2017
592bfbd
Enforce connection timeout for netty channels
lutovich Sep 12, 2017
72ad277
Implemented and removed couple minor TODOs
lutovich Sep 12, 2017
76b8247
Cleanup around handling of futures
lutovich Sep 12, 2017
b8e2d56
Moved async pool related classes to separate package
lutovich Sep 12, 2017
0151807
More consistent method naming
lutovich Sep 12, 2017
0170043
Support long idle connection ping
lutovich Sep 13, 2017
04c2e12
Expose statement keys in StatementResultCursor
lutovich Sep 13, 2017
92e5f2c
Expose result summary in StatementResultCursor
lutovich Sep 13, 2017
cf9adc2
Task renamed to Response and moved to public package
lutovich Sep 14, 2017
fe910fb
Fixed couple unit test failures
lutovich Sep 15, 2017
6a109c2
Renamed `InternalFuture#asTask()` to `#asResponse()`
lutovich Sep 15, 2017
c87c35c
Constants for BEGIN, COMMIT and ROLLBACK
lutovich Sep 18, 2017
587301a
Made DEFAULT_USER_AGENT a private constant
lutovich Sep 18, 2017
5acd107
Simplify Response impl
lutovich Sep 18, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<build.revision></build.revision>
<bundle.name>${project.groupId}.${project.artifactId}</bundle.name>
<maven.build.timestamp.format>'v'yyyyMMdd-HHmm</maven.build.timestamp.format>
<netty.version>4.1.15.Final</netty.version>
</properties>

<parent>
Expand All @@ -32,7 +33,13 @@
</licenses>

<dependencies>
<!-- Test dependencies -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
Expand Down
39 changes: 39 additions & 0 deletions driver/src/main/java/org/neo4j/driver/ResultResourcesHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver;

public interface ResultResourcesHandler
{
void resultFetched();

void resultFailed( Throwable error );

ResultResourcesHandler NO_OP = new ResultResourcesHandler()
{
@Override
public void resultFetched()
{
}

@Override
public void resultFailed( Throwable error )
{
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal;
package org.neo4j.driver;

import org.neo4j.driver.internal.spi.Collector.NoOperationCollector;
import java.util.Collections;
import java.util.List;

class BookmarkCollector extends NoOperationCollector
public class StatementKeys
{
private final ExplicitTransaction transaction;
private List<String> keys;

BookmarkCollector( ExplicitTransaction transaction )
public boolean isPopulated()
{
this.transaction = transaction;
return keys != null;
}

@Override
public void bookmark( Bookmark bookmark )
public void set( List<String> keys )
{
transaction.setBookmark( bookmark );
this.keys = keys;
}

public List<String> asList()
{
return keys == null ? Collections.<String>emptyList() : keys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ private static String driverVersion()

private final AuthToken authToken;
private final String userAgent;
private final int timeoutMillis;
private final int connectTimeoutMillis;

public ConnectionSettings( AuthToken authToken, String userAgent, int timeoutMillis )
public ConnectionSettings( AuthToken authToken, String userAgent, int connectTimeoutMillis )
{
this.authToken = authToken;
this.userAgent = userAgent;
this.timeoutMillis = timeoutMillis;
this.connectTimeoutMillis = connectTimeoutMillis;
}

public ConnectionSettings( AuthToken authToken, int timeoutMillis )
public ConnectionSettings( AuthToken authToken, int connectTimeoutMillis )
{
this( authToken, DEFAULT_USER_AGENT, timeoutMillis );
this( authToken, DEFAULT_USER_AGENT, connectTimeoutMillis );
}

public AuthToken authToken()
Expand All @@ -76,8 +76,8 @@ public String userAgent()
return userAgent;
}

public int timeoutMillis()
public int connectTimeoutMillis()
{
return timeoutMillis;
return connectTimeoutMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.neo4j.driver.internal;

import org.neo4j.driver.internal.async.AsyncConnection;
import org.neo4j.driver.internal.async.InternalFuture;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
Expand All @@ -32,11 +35,13 @@ public class DirectConnectionProvider implements ConnectionProvider
{
private final BoltServerAddress address;
private final ConnectionPool pool;
private final AsyncConnectionPool asyncPool;

DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool )
DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool, AsyncConnectionPool asyncPool )
{
this.address = address;
this.pool = pool;
this.asyncPool = asyncPool;

verifyConnectivity();
}
Expand All @@ -47,10 +52,17 @@ public PooledConnection acquireConnection( AccessMode mode )
return pool.acquire( address );
}

@Override
public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
{
return asyncPool.acquire( address );
}

@Override
public void close() throws Exception
{
pool.close();
asyncPool.closeAsync().syncUninterruptibly();
}

public BoltServerAddress getAddress()
Expand Down
48 changes: 39 additions & 9 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
*/
package org.neo4j.driver.internal;

import io.netty.bootstrap.Bootstrap;

import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.async.AsyncConnectorImpl;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
Expand Down Expand Up @@ -59,22 +66,28 @@ public class DriverFactory
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings,
RetrySettings retrySettings, Config config )
{
authToken = authToken == null ? AuthTokens.none() : authToken;

BoltServerAddress address = new BoltServerAddress( uri );
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );

AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );

try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic );
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
asyncConnectionPool );
}
catch ( Throwable driverError )
{
// we need to close the connection pool if driver creation threw exception
try
{
connectionPool.close();
asyncConnectionPool.closeAsync().syncUninterruptibly();
}
catch ( Throwable closeError )
{
Expand All @@ -84,16 +97,33 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}
}

private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
Config config )
{
Clock clock = createClock();
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
activeChannelTracker, config.logging(), clock );
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(),
config.connectionAcquisitionTimeoutMillis() );
return new AsyncConnectionPoolImpl( connector, bootstrap, activeChannelTracker, poolSettings, config.logging(),
clock );
}

private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic retryLogic )
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
default:
Expand All @@ -107,9 +137,10 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* <b>This method is protected only for testing</b>
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic )
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
{
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}
Expand Down Expand Up @@ -173,11 +204,10 @@ private static LoadBalancingStrategy createLoadBalancingStrategy( Config config,
*/
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
{
authToken = authToken == null ? AuthTokens.none() : authToken;

ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetime() );
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(), config.connectionAcquisitionTimeoutMillis() );
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );

return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
Expand All @@ -198,7 +228,7 @@ protected Clock createClock()
* <p>
* <b>This method is protected only for testing</b>
*/
protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
protected Connector createConnector( final ConnectionSettings connectionSettings, SecurityPlan securityPlan,
Logging logging )
{
return new SocketConnector( connectionSettings, securityPlan, logging );
Expand Down
Loading