diff --git a/driver/src/main/java/org/neo4j/driver/Session.java b/driver/src/main/java/org/neo4j/driver/Session.java
index 99cb719eb7..d65fbf3c23 100644
--- a/driver/src/main/java/org/neo4j/driver/Session.java
+++ b/driver/src/main/java/org/neo4j/driver/Session.java
@@ -19,7 +19,6 @@
package org.neo4j.driver;
import java.util.Map;
-import java.util.function.Consumer;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.util.Resource;
@@ -85,20 +84,6 @@ public interface Session extends Resource, StatementRunner
*/
Transaction beginTransaction( TransactionConfig config );
- /**
- * Begin a new explicit {@linkplain Transaction transaction},
- * requiring that the server hosting is at least as up-to-date as the
- * transaction referenced by the supplied bookmark.
- *
- * @param bookmark a reference to a previous transaction
- * @return a new {@link Transaction}
- * @deprecated This method is deprecated in favour of {@link Driver#session(Consumer)} that accepts an initial
- * bookmark. Session will ensure that all nested transactions are chained with bookmarks to guarantee
- * causal consistency. This method will be removed in the next major release.
- */
- @Deprecated
- Transaction beginTransaction( String bookmark );
-
/**
* Execute given unit of work in a {@link AccessMode#READ read} transaction.
*
diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java
index 838fcd6a81..456c9b0af2 100644
--- a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java
+++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java
@@ -18,10 +18,8 @@
*/
package org.neo4j.driver.async;
-import org.neo4j.driver.Transaction;
-
/**
- * Callback that executes operations against a given {@link Transaction}.
+ * Callback that executes operations against a given {@link AsyncTransaction}.
* To be used with {@link AsyncSession#readTransactionAsync(AsyncTransactionWork)} and
* {@link AsyncSession#writeTransactionAsync(AsyncTransactionWork)} (AsyncTransactionWork)} methods.
*
diff --git a/driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java b/driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java
index 6ee3d75dff..59abbf0101 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java
@@ -19,22 +19,19 @@
package org.neo4j.driver.internal;
import java.util.Map;
-import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.async.AsyncStatementRunner;
-import org.neo4j.driver.internal.types.InternalTypeSystem;
-import org.neo4j.driver.internal.util.Extract;
-import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.Record;
import org.neo4j.driver.Statement;
import org.neo4j.driver.StatementResult;
-import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.StatementRunner;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
+import org.neo4j.driver.internal.types.InternalTypeSystem;
+import org.neo4j.driver.internal.util.Extract;
+import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.types.TypeSystem;
-public abstract class AbstractStatementRunner implements StatementRunner, AsyncStatementRunner
+public abstract class AbstractStatementRunner implements StatementRunner
{
@Override
public final StatementResult run( String statementTemplate, Value parameters )
@@ -42,48 +39,24 @@ public final StatementResult run( String statementTemplate, Value parameters )
return run( new Statement( statementTemplate, parameters ) );
}
- @Override
- public final CompletionStage runAsync( String statementTemplate, Value parameters )
- {
- return runAsync( new Statement( statementTemplate, parameters ) );
- }
-
@Override
public final StatementResult run( String statementTemplate, Map statementParameters )
{
return run( statementTemplate, parameters( statementParameters ) );
}
- @Override
- public final CompletionStage runAsync( String statementTemplate, Map statementParameters )
- {
- return runAsync( statementTemplate, parameters( statementParameters ) );
- }
-
@Override
public final StatementResult run( String statementTemplate, Record statementParameters )
{
return run( statementTemplate, parameters( statementParameters ) );
}
- @Override
- public final CompletionStage runAsync( String statementTemplate, Record statementParameters )
- {
- return runAsync( statementTemplate, parameters( statementParameters ) );
- }
-
@Override
public final StatementResult run( String statementText )
{
return run( statementText, Values.EmptyMap );
}
- @Override
- public final CompletionStage runAsync( String statementText )
- {
- return runAsync( statementText, Values.EmptyMap );
- }
-
@Override
public final TypeSystem typeSystem()
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
index dfdcccbbcc..329414c0b5 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
@@ -21,7 +21,7 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
-import org.neo4j.driver.internal.async.DecoratedConnection;
+import org.neo4j.driver.internal.async.connection.DecoratedConnection;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index e6872a6127..e83a167103 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -26,9 +26,9 @@
import java.net.URI;
import java.security.GeneralSecurityException;
-import org.neo4j.driver.internal.async.BootstrapFactory;
-import org.neo4j.driver.internal.async.ChannelConnector;
-import org.neo4j.driver.internal.async.ChannelConnectorImpl;
+import org.neo4j.driver.internal.async.connection.BootstrapFactory;
+import org.neo4j.driver.internal.async.connection.ChannelConnector;
+import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.DnsResolver;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
index b625a6bdbf..9880cf2da4 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
@@ -29,6 +29,8 @@
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionParametersTemplate;
import org.neo4j.driver.async.AsyncSession;
+import org.neo4j.driver.internal.async.InternalAsyncSession;
+import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.metrics.MetricsProvider;
import org.neo4j.driver.internal.reactive.InternalRxSession;
import org.neo4j.driver.internal.security.SecurityPlan;
@@ -57,7 +59,7 @@ public class InternalDriver implements Driver
@Override
public Session session()
{
- return newSession( SessionParameters.empty() );
+ return new InternalSession( newSession( SessionParameters.empty() ) );
}
@Override
@@ -65,7 +67,7 @@ public Session session( Consumer templateConsumer )
{
SessionParameters.Template template = SessionParameters.template();
templateConsumer.accept( template );
- return newSession( template.build() );
+ return new InternalSession( newSession( template.build() ) );
}
@Override
@@ -85,7 +87,7 @@ public RxSession rxSession( Consumer templateConsumer
@Override
public AsyncSession asyncSession()
{
- return newSession( SessionParameters.empty() );
+ return new InternalAsyncSession( newSession( SessionParameters.empty() ) );
}
@Override
@@ -93,7 +95,7 @@ public AsyncSession asyncSession( Consumer templateCo
{
SessionParameters.Template template = SessionParameters.template();
templateConsumer.accept( template );
- return newSession( template.build() );
+ return new InternalAsyncSession( newSession( template.build() ) );
}
@Override
@@ -148,7 +150,7 @@ private static RuntimeException driverCloseException()
return new IllegalStateException( "This driver instance has already been closed" );
}
- private NetworkSession newSession( SessionParameters parameters )
+ public NetworkSession newSession( SessionParameters parameters )
{
assertOpen();
NetworkSession session = sessionFactory.newInstance( parameters );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
new file mode 100644
index 0000000000..5c937c8fa4
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal;
+
+import java.util.Map;
+
+import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.StatementResult;
+import org.neo4j.driver.Transaction;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.neo4j.driver.async.StatementResultCursor;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.async.NetworkSession;
+import org.neo4j.driver.internal.spi.Connection;
+import org.neo4j.driver.internal.util.Futures;
+
+import static java.util.Collections.emptyMap;
+
+public class InternalSession extends AbstractStatementRunner implements Session
+{
+ private final NetworkSession session;
+
+ public InternalSession( NetworkSession session )
+ {
+ this.session = session;
+ }
+
+ @Override
+ public StatementResult run( Statement statement )
+ {
+ return run( statement, TransactionConfig.empty() );
+ }
+
+ @Override
+ public StatementResult run( String statement, TransactionConfig config )
+ {
+ return run( statement, emptyMap(), config );
+ }
+
+ @Override
+ public StatementResult run( String statement, Map parameters, TransactionConfig config )
+ {
+ return run( new Statement( statement, parameters ), config );
+ }
+
+ @Override
+ public StatementResult run( Statement statement, TransactionConfig config )
+ {
+ StatementResultCursor cursor = Futures.blockingGet( session.runAsync( statement, config, false ),
+ () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
+
+ // query executed, it is safe to obtain a connection in a blocking way
+ Connection connection = Futures.getNow( session.connectionAsync() );
+ return new InternalStatementResult( connection, cursor );
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return session.isOpen();
+ }
+
+ @Override
+ public void close()
+ {
+ Futures.blockingGet( session.closeAsync(), () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) );
+ }
+
+ @Override
+ public Transaction beginTransaction()
+ {
+ return beginTransaction( TransactionConfig.empty() );
+ }
+
+ @Override
+ public Transaction beginTransaction( TransactionConfig config )
+ {
+ ExplicitTransaction tx = Futures.blockingGet( session.beginTransactionAsync( config ),
+ () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
+ return new InternalTransaction( tx );
+ }
+
+ @Override
+ public T readTransaction( TransactionWork work )
+ {
+ return readTransaction( work, TransactionConfig.empty() );
+ }
+
+ @Override
+ public T readTransaction( TransactionWork work, TransactionConfig config )
+ {
+ return transaction( AccessMode.READ, work, config );
+ }
+
+ @Override
+ public T writeTransaction( TransactionWork work )
+ {
+ return writeTransaction( work, TransactionConfig.empty() );
+ }
+
+ @Override
+ public T writeTransaction( TransactionWork work, TransactionConfig config )
+ {
+ return transaction( AccessMode.WRITE, work, config );
+ }
+
+ @Override
+ public String lastBookmark()
+ {
+ return session.lastBookmark();
+ }
+
+ @Override
+ @SuppressWarnings( "deprecation" )
+ public void reset()
+ {
+ Futures.blockingGet( session.resetAsync(), () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
+ }
+
+ private T transaction( AccessMode mode, TransactionWork work, TransactionConfig config )
+ {
+ // use different code path compared to async so that work is executed in the caller thread
+ // caller thread will also be the one who sleeps between retries;
+ // it is unsafe to execute retries in the event loop threads because this can cause a deadlock
+ // event loop thread will bock and wait for itself to read some data
+ return session.retryLogic().retry( () -> {
+ try ( Transaction tx = beginTransaction( mode, config ) )
+ {
+ try
+ {
+ T result = work.execute( tx );
+ tx.success();
+ return result;
+ }
+ catch ( Throwable t )
+ {
+ // mark transaction for failure if the given unit of work threw exception
+ // this will override any success marks that were made by the unit of work
+ tx.failure();
+ throw t;
+ }
+ }
+ } );
+ }
+
+ private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
+ {
+ ExplicitTransaction tx = Futures.blockingGet( session.beginTransactionAsync( mode, config ),
+ () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
+ return new InternalTransaction( tx );
+ }
+
+ private void terminateConnectionOnThreadInterrupt( String reason )
+ {
+ // try to get current connection if it has been acquired
+ Connection connection = null;
+ try
+ {
+ connection = Futures.getNow( session.connectionAsync() );
+ }
+ catch ( Throwable ignore )
+ {
+ // ignore errors because handing interruptions is best effort
+ }
+
+ if ( connection != null )
+ {
+ connection.terminateAndRelease( reason );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
new file mode 100644
index 0000000000..60d26b6947
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal;
+
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.StatementResult;
+import org.neo4j.driver.Transaction;
+import org.neo4j.driver.async.StatementResultCursor;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.util.Futures;
+
+public class InternalTransaction extends AbstractStatementRunner implements Transaction
+{
+ private final ExplicitTransaction tx;
+ public InternalTransaction( ExplicitTransaction tx )
+ {
+ this.tx = tx;
+ }
+
+ @Override
+ public void success()
+ {
+ tx.success();
+ }
+
+ @Override
+ public void failure()
+ {
+ tx.failure();
+ }
+
+ @Override
+ public void close()
+ {
+ Futures.blockingGet( tx.closeAsync(),
+ () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the transaction" ) );
+ }
+
+ @Override
+ public StatementResult run( Statement statement )
+ {
+ StatementResultCursor cursor = Futures.blockingGet( tx.runAsync( statement, false ),
+ () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
+ return new InternalStatementResult( tx.connection(), cursor );
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return tx.isOpen();
+ }
+
+ private void terminateConnectionOnThreadInterrupt( String reason )
+ {
+ tx.connection().terminateAndRelease( reason );
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java
index d2f2657bd8..2863980644 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java
@@ -20,6 +20,8 @@
import java.util.concurrent.CompletionStage;
+import org.neo4j.driver.internal.async.NetworkSession;
+
public interface SessionFactory
{
NetworkSession newInstance( SessionParameters parameters );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
index 5cb16520d7..37ee254375 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
@@ -23,6 +23,8 @@
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Config;
import org.neo4j.driver.Logging;
+import org.neo4j.driver.internal.async.NetworkSession;
+import org.neo4j.driver.internal.async.LeakLoggingNetworkSession;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncAbstractStatementRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncAbstractStatementRunner.java
new file mode 100644
index 0000000000..d2a7df9ea4
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncAbstractStatementRunner.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.Values;
+import org.neo4j.driver.async.AsyncStatementRunner;
+import org.neo4j.driver.async.StatementResultCursor;
+
+import static org.neo4j.driver.internal.AbstractStatementRunner.parameters;
+
+public abstract class AsyncAbstractStatementRunner implements AsyncStatementRunner
+{
+ @Override
+ public final CompletionStage runAsync( String statementTemplate, Value parameters )
+ {
+ return runAsync( new Statement( statementTemplate, parameters ) );
+ }
+
+ @Override
+ public final CompletionStage runAsync( String statementTemplate, Map statementParameters )
+ {
+ return runAsync( statementTemplate, parameters( statementParameters ) );
+ }
+
+ @Override
+ public final CompletionStage runAsync( String statementTemplate, Record statementParameters )
+ {
+ return runAsync( statementTemplate, parameters( statementParameters ) );
+ }
+
+ @Override
+ public final CompletionStage runAsync( String statementText )
+ {
+ return runAsync( statementText, Values.EmptyMap );
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/AsyncStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java
similarity index 97%
rename from driver/src/main/java/org/neo4j/driver/internal/AsyncStatementResultCursor.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java
index 9b306aa207..dd82c5f2cd 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/AsyncStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal;
+package org.neo4j.driver.internal.async;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -27,7 +27,7 @@
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.util.Futures;
-import org.neo4j.driver.internal.reactive.cursor.InternalStatementResultCursor;
+import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.summary.ResultSummary;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java
similarity index 82%
rename from driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java
index 30efd30958..fc1e848240 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java
@@ -16,31 +16,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal;
+package org.neo4j.driver.internal.async;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
-import org.neo4j.driver.async.AsyncTransaction;
-import org.neo4j.driver.internal.async.ResultCursorsHolder;
-import org.neo4j.driver.internal.messaging.BoltProtocol;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.Futures;
-import org.neo4j.driver.internal.reactive.cursor.InternalStatementResultCursor;
-import org.neo4j.driver.internal.reactive.cursor.RxStatementResultCursor;
import org.neo4j.driver.Session;
import org.neo4j.driver.Statement;
-import org.neo4j.driver.StatementResult;
-import org.neo4j.driver.async.StatementResultCursor;
-import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.internal.Bookmarks;
+import org.neo4j.driver.internal.BookmarksHolder;
+import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
+import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
+import org.neo4j.driver.internal.messaging.BoltProtocol;
+import org.neo4j.driver.internal.spi.Connection;
+import org.neo4j.driver.internal.util.Futures;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
-public class ExplicitTransaction extends AbstractStatementRunner implements Transaction, AsyncTransaction
+public class ExplicitTransaction
{
private enum State
{
@@ -96,7 +94,6 @@ public CompletionStage beginAsync( Bookmarks initialBookmar
} );
}
- @Override
public void success()
{
if ( state == State.ACTIVE )
@@ -105,7 +102,6 @@ public void success()
}
}
- @Override
public void failure()
{
if ( state == State.ACTIVE || state == State.MARKED_SUCCESS )
@@ -114,14 +110,7 @@ public void failure()
}
}
- @Override
- public void close()
- {
- Futures.blockingGet( closeAsync(),
- () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the transaction" ) );
- }
-
- CompletionStage closeAsync()
+ public CompletionStage closeAsync()
{
if ( state == State.MARKED_SUCCESS )
{
@@ -137,7 +126,6 @@ else if ( state != State.COMMITTED && state != State.ROLLED_BACK )
}
}
- @Override
public CompletionStage commitAsync()
{
if ( state == State.COMMITTED )
@@ -156,7 +144,6 @@ else if ( state == State.ROLLED_BACK )
}
}
- @Override
public CompletionStage rollbackAsync()
{
if ( state == State.COMMITTED )
@@ -175,28 +162,13 @@ else if ( state == State.ROLLED_BACK )
}
}
- @Override
- public StatementResult run( Statement statement )
- {
- StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
- () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
- return new InternalStatementResult( connection, cursor );
- }
-
- @Override
- public CompletionStage runAsync( Statement statement )
- {
- //noinspection unchecked
- return (CompletionStage) run( statement, true );
- }
-
- private CompletionStage run( Statement statement, boolean waitForRunResponse )
+ public CompletionStage runAsync( Statement statement, boolean waitForRunResponse )
{
ensureCanRunQueries();
CompletionStage cursorStage =
protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse ).asyncResult();
resultCursors.add( cursorStage );
- return cursorStage;
+ return cursorStage.thenApply( cursor -> cursor );
}
public CompletionStage runRx( Statement statement )
@@ -208,6 +180,21 @@ public CompletionStage runRx( Statement statement )
return cursorStage;
}
+ public boolean isOpen()
+ {
+ return state != State.COMMITTED && state != State.ROLLED_BACK;
+ }
+
+ public void markTerminated()
+ {
+ state = State.TERMINATED;
+ }
+
+ public Connection connection()
+ {
+ return connection;
+ }
+
private void ensureCanRunQueries()
{
if ( state == State.COMMITTED )
@@ -221,26 +208,15 @@ else if ( state == State.ROLLED_BACK )
else if ( state == State.MARKED_FAILED )
{
throw new ClientException( "Cannot run more statements in this transaction, it has been marked for failure. " +
- "Please either rollback or close this transaction" );
+ "Please either rollback or close this transaction" );
}
else if ( state == State.TERMINATED )
{
throw new ClientException( "Cannot run more statements in this transaction, " +
- "it has either experienced an fatal error or was explicitly terminated" );
+ "it has either experienced an fatal error or was explicitly terminated" );
}
}
- @Override
- public boolean isOpen()
- {
- return state != State.COMMITTED && state != State.ROLLED_BACK;
- }
-
- public void markTerminated()
- {
- state = State.TERMINATED;
- }
-
private CompletionStage doCommitAsync()
{
if ( state == State.TERMINATED )
@@ -285,9 +261,4 @@ private void transactionClosed( boolean isCommitted )
}
connection.release(); // release in background
}
-
- private void terminateConnectionOnThreadInterrupt( String reason )
- {
- connection.terminateAndRelease( reason );
- }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
new file mode 100644
index 0000000000..7dc5837fba
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.async.AsyncSession;
+import org.neo4j.driver.async.AsyncTransaction;
+import org.neo4j.driver.async.AsyncTransactionWork;
+import org.neo4j.driver.async.StatementResultCursor;
+import org.neo4j.driver.internal.util.Futures;
+
+import static java.util.Collections.emptyMap;
+import static org.neo4j.driver.internal.util.Futures.completedWithNull;
+import static org.neo4j.driver.internal.util.Futures.failedFuture;
+
+public class InternalAsyncSession extends AsyncAbstractStatementRunner implements AsyncSession
+{
+ private final NetworkSession session;
+
+ public InternalAsyncSession( NetworkSession session )
+ {
+ this.session = session;
+ }
+
+ @Override
+ public CompletionStage runAsync( Statement statement )
+ {
+ return runAsync( statement, TransactionConfig.empty() );
+ }
+
+ @Override
+ public CompletionStage runAsync( String statement, TransactionConfig config )
+ {
+ return runAsync( statement, emptyMap(), config );
+ }
+
+ @Override
+ public CompletionStage runAsync( String statement, Map parameters, TransactionConfig config )
+ {
+ return runAsync( new Statement( statement, parameters ), config );
+ }
+
+ @Override
+ public CompletionStage runAsync( Statement statement, TransactionConfig config )
+ {
+ return session.runAsync( statement, config, true );
+ }
+
+ @Override
+ public CompletionStage closeAsync()
+ {
+ return session.closeAsync();
+ }
+
+ @Override
+ public CompletionStage beginTransactionAsync()
+ {
+ return beginTransactionAsync( TransactionConfig.empty() );
+ }
+
+ @Override
+ public CompletionStage beginTransactionAsync( TransactionConfig config )
+ {
+ return session.beginTransactionAsync( config ).thenApply( InternalAsyncTransaction::new );
+ }
+
+ @Override
+ public CompletionStage readTransactionAsync( AsyncTransactionWork> work )
+ {
+ return readTransactionAsync( work, TransactionConfig.empty() );
+ }
+
+ @Override
+ public CompletionStage readTransactionAsync( AsyncTransactionWork> work, TransactionConfig config )
+ {
+ return transactionAsync( AccessMode.READ, work, config );
+ }
+
+ @Override
+ public CompletionStage writeTransactionAsync( AsyncTransactionWork> work )
+ {
+ return writeTransactionAsync( work, TransactionConfig.empty() );
+ }
+
+ @Override
+ public CompletionStage writeTransactionAsync( AsyncTransactionWork> work, TransactionConfig config )
+ {
+ return transactionAsync( AccessMode.WRITE, work, config );
+ }
+
+ @Override
+ public String lastBookmark()
+ {
+ return session.lastBookmark();
+ }
+
+ private CompletionStage transactionAsync( AccessMode mode, AsyncTransactionWork> work, TransactionConfig config )
+ {
+ return session.retryLogic().retryAsync( () -> {
+ CompletableFuture resultFuture = new CompletableFuture<>();
+ CompletionStage txFuture = session.beginTransactionAsync( mode, config );
+
+ txFuture.whenComplete( ( tx, completionError ) -> {
+ Throwable error = Futures.completionExceptionCause( completionError );
+ if ( error != null )
+ {
+ resultFuture.completeExceptionally( error );
+ }
+ else
+ {
+ executeWork( resultFuture, tx, work );
+ }
+ } );
+
+ return resultFuture;
+ } );
+ }
+
+ private void executeWork( CompletableFuture resultFuture, ExplicitTransaction tx, AsyncTransactionWork> work )
+ {
+ CompletionStage workFuture = safeExecuteWork( tx, work );
+ workFuture.whenComplete( ( result, completionError ) -> {
+ Throwable error = Futures.completionExceptionCause( completionError );
+ if ( error != null )
+ {
+ rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
+ }
+ else
+ {
+ closeTxAfterSucceededTransactionWork( tx, resultFuture, result );
+ }
+ } );
+ }
+
+ private CompletionStage safeExecuteWork( ExplicitTransaction tx, AsyncTransactionWork> work )
+ {
+ // given work might fail in both async and sync way
+ // async failure will result in a failed future being returned
+ // sync failure will result in an exception being thrown
+ try
+ {
+ CompletionStage result = work.execute( new InternalAsyncTransaction( tx ) );
+
+ // protect from given transaction function returning null
+ return result == null ? completedWithNull() : result;
+ }
+ catch ( Throwable workError )
+ {
+ // work threw an exception, wrap it in a future and proceed
+ return failedFuture( workError );
+ }
+ }
+
+ private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, CompletableFuture resultFuture, Throwable error )
+ {
+ if ( tx.isOpen() )
+ {
+ tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> {
+ if ( rollbackError != null )
+ {
+ error.addSuppressed( rollbackError );
+ }
+ resultFuture.completeExceptionally( error );
+ } );
+ }
+ else
+ {
+ resultFuture.completeExceptionally( error );
+ }
+ }
+
+ private void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, CompletableFuture resultFuture, T result )
+ {
+ if ( tx.isOpen() )
+ {
+ tx.success();
+ tx.closeAsync().whenComplete( ( ignore, completionError ) -> {
+ Throwable commitError = Futures.completionExceptionCause( completionError );
+ if ( commitError != null )
+ {
+ resultFuture.completeExceptionally( commitError );
+ }
+ else
+ {
+ resultFuture.complete( result );
+ }
+ } );
+ }
+ else
+ {
+ resultFuture.complete( result );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java
new file mode 100644
index 0000000000..75582f6bf4
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import java.util.concurrent.CompletionStage;
+
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.async.AsyncTransaction;
+import org.neo4j.driver.async.StatementResultCursor;
+
+public class InternalAsyncTransaction extends AsyncAbstractStatementRunner implements AsyncTransaction
+{
+ private final ExplicitTransaction tx;
+ public InternalAsyncTransaction( ExplicitTransaction tx )
+ {
+ this.tx = tx;
+ }
+
+ @Override
+ public CompletionStage commitAsync()
+ {
+ return tx.commitAsync();
+ }
+
+ @Override
+ public CompletionStage rollbackAsync()
+ {
+ return tx.rollbackAsync();
+ }
+
+ @Override
+ public CompletionStage runAsync( Statement statement )
+ {
+ return tx.runAsync( statement, true );
+ }
+
+ public void markTerminated()
+ {
+ tx.markTerminated();
+ }
+
+ public boolean isOpen()
+ {
+ return tx.isOpen();
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
similarity index 88%
rename from driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
index 3a93085aaa..c50f013069 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
@@ -16,21 +16,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal;
+package org.neo4j.driver.internal.async;
+import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.util.Futures;
-import org.neo4j.driver.AccessMode;
-import org.neo4j.driver.Logging;
import static java.lang.System.lineSeparator;
-class LeakLoggingNetworkSession extends NetworkSession
+public class LeakLoggingNetworkSession extends NetworkSession
{
private final String stackTrace;
- LeakLoggingNetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, String databaseName, AccessMode mode,
+ public LeakLoggingNetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, String databaseName, AccessMode mode,
BookmarksHolder bookmarksHolder, Logging logging )
{
super( connectionProvider, retryLogic, databaseName, mode, bookmarksHolder, logging );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
similarity index 51%
rename from driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
index 6977b357ff..09c21d99e2 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
@@ -16,10 +16,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal;
+package org.neo4j.driver.internal.async;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,32 +25,25 @@
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
-import org.neo4j.driver.Session;
import org.neo4j.driver.Statement;
-import org.neo4j.driver.StatementResult;
-import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionConfig;
-import org.neo4j.driver.TransactionWork;
-import org.neo4j.driver.async.AsyncSession;
-import org.neo4j.driver.async.AsyncTransaction;
-import org.neo4j.driver.async.AsyncTransactionWork;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.internal.BookmarksHolder;
+import org.neo4j.driver.internal.FailableCursor;
+import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
+import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
+import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.logging.PrefixedLogger;
-import org.neo4j.driver.internal.reactive.cursor.InternalStatementResultCursor;
-import org.neo4j.driver.internal.reactive.cursor.RxStatementResultCursor;
-import org.neo4j.driver.internal.reactive.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.util.Futures;
-import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
-import static org.neo4j.driver.internal.util.Futures.failedFuture;
-public class NetworkSession extends AbstractStatementRunner implements Session, AsyncSession
+public class NetworkSession
{
private static final String LOG_NAME = "Session";
@@ -69,8 +60,8 @@ public class NetworkSession extends AbstractStatementRunner implements Session,
private final AtomicBoolean open = new AtomicBoolean( true );
- public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, String databaseName, AccessMode mode, BookmarksHolder bookmarksHolder,
- Logging logging )
+ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, String databaseName, AccessMode mode,
+ BookmarksHolder bookmarksHolder, Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
@@ -80,195 +71,59 @@ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLo
this.databaseName = databaseName;
}
- @Override
- public StatementResult run( Statement statement )
+ public CompletionStage runAsync( Statement statement, TransactionConfig config, boolean waitForRunResponse )
{
- return run( statement, TransactionConfig.empty() );
- }
-
- @Override
- public StatementResult run( String statement, TransactionConfig config )
- {
- return run( statement, emptyMap(), config );
- }
-
- @Override
- public StatementResult run( String statement, Map parameters, TransactionConfig config )
- {
- return run( new Statement( statement, parameters ), config );
- }
-
- @Override
- public StatementResult run( Statement statement, TransactionConfig config )
- {
- StatementResultCursor cursor = Futures.blockingGet( run( statement, config, false ),
- () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
-
- // query executed, it is safe to obtain a connection in a blocking way
- Connection connection = Futures.getNow( connectionStage );
- return new InternalStatementResult( connection, cursor );
- }
-
- @Override
- public CompletionStage runAsync( Statement statement )
- {
- return runAsync( statement, TransactionConfig.empty() );
- }
+ CompletionStage newResultCursorStage =
+ buildResultCursorFactory( statement, config, waitForRunResponse ).thenCompose( StatementResultCursorFactory::asyncResult );
- @Override
- public CompletionStage runAsync( String statement, TransactionConfig config )
- {
- return runAsync( statement, emptyMap(), config );
+ resultCursorStage = newResultCursorStage.exceptionally( error -> null );
+ return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
}
- @Override
- public CompletionStage runAsync( String statement, Map parameters, TransactionConfig config )
+ public CompletionStage runRx( Statement statement, TransactionConfig config )
{
- return runAsync( new Statement( statement, parameters ), config );
- }
+ CompletionStage newResultCursorStage =
+ buildResultCursorFactory( statement, config, true ).thenCompose( StatementResultCursorFactory::rxResult );
- @Override
- public CompletionStage runAsync( Statement statement, TransactionConfig config )
- {
- //noinspection unchecked
- return (CompletionStage) run( statement, config, true );
+ resultCursorStage = newResultCursorStage.exceptionally( error -> null );
+ return newResultCursorStage;
}
- @Override
- public boolean isOpen()
+ public CompletionStage beginTransactionAsync( TransactionConfig config )
{
- return open.get();
+ return this.beginTransactionAsync( mode, config );
}
- @Override
- public void close()
+ public CompletionStage beginTransactionAsync( AccessMode mode, TransactionConfig config )
{
- Futures.blockingGet( closeAsync(),
- () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) );
- }
+ ensureSessionIsOpen();
- @Override
- public CompletionStage closeAsync()
- {
- if ( open.compareAndSet( true, false ) )
- {
- return resultCursorStage.thenCompose( cursor ->
- {
- if ( cursor != null )
- {
- // there exists a cursor with potentially unconsumed error, try to extract and propagate it
- return cursor.failureAsync();
- }
- // no result cursor exists so no error exists
- return completedWithNull();
- } ).thenCompose( cursorError -> closeTransactionAndReleaseConnection().thenApply( txCloseError ->
- {
- // now we have cursor error, active transaction has been closed and connection has been released
- // back to the pool; try to propagate cursor and transaction close errors, if any
- CompletionException combinedError = Futures.combineErrors( cursorError, txCloseError );
- if ( combinedError != null )
+ // create a chain that acquires connection and starts a transaction
+ CompletionStage newTransactionStage = ensureNoOpenTxBeforeStartingTx()
+ .thenCompose( ignore -> acquireConnection( databaseName, mode ) )
+ .thenCompose( connection ->
{
- throw combinedError;
- }
- return null;
- } ) );
- }
- return completedWithNull();
- }
-
- @Override
- public Transaction beginTransaction()
- {
- return beginTransaction( TransactionConfig.empty() );
- }
-
- @Override
- public Transaction beginTransaction( TransactionConfig config )
- {
- return beginTransaction( mode, config );
- }
-
- @Deprecated
- @Override
- public Transaction beginTransaction( String bookmark )
- {
- bookmarksHolder.setBookmarks( Bookmarks.from( bookmark ) );
- return beginTransaction();
- }
-
- @Override
- public CompletionStage beginTransactionAsync()
- {
- return beginTransactionAsync( TransactionConfig.empty() );
- }
-
- @Override
- public CompletionStage beginTransactionAsync( TransactionConfig config )
- {
- //noinspection unchecked
- return (CompletionStage) beginTransactionAsync( mode, config );
- }
-
- @Override
- public T readTransaction( TransactionWork work )
- {
- return readTransaction( work, TransactionConfig.empty() );
- }
-
- @Override
- public T readTransaction( TransactionWork work, TransactionConfig config )
- {
- return transaction( AccessMode.READ, work, config );
- }
-
- @Override
- public CompletionStage readTransactionAsync( AsyncTransactionWork> work )
- {
- return readTransactionAsync( work, TransactionConfig.empty() );
- }
-
- @Override
- public CompletionStage readTransactionAsync( AsyncTransactionWork> work, TransactionConfig config )
- {
- return transactionAsync( AccessMode.READ, work, config );
- }
-
- @Override
- public T writeTransaction( TransactionWork work )
- {
- return writeTransaction( work, TransactionConfig.empty() );
- }
-
- @Override
- public T writeTransaction( TransactionWork work, TransactionConfig config )
- {
- return transaction( AccessMode.WRITE, work, config );
- }
-
- @Override
- public CompletionStage writeTransactionAsync( AsyncTransactionWork> work )
- {
- return writeTransactionAsync( work, TransactionConfig.empty() );
- }
+ ExplicitTransaction tx = new ExplicitTransaction( connection, bookmarksHolder );
+ return tx.beginAsync( bookmarksHolder.getBookmarks(), config );
+ } );
- @Override
- public CompletionStage writeTransactionAsync( AsyncTransactionWork> work, TransactionConfig config )
- {
- return transactionAsync( AccessMode.WRITE, work, config );
- }
+ // update the reference to the only known transaction
+ CompletionStage currentTransactionStage = transactionStage;
- @Override
- public String lastBookmark()
- {
- return bookmarksHolder.lastBookmark();
- }
+ transactionStage = newTransactionStage
+ .exceptionally( error -> null ) // ignore errors from starting new transaction
+ .thenCompose( tx ->
+ {
+ if ( tx == null )
+ {
+ // failed to begin new transaction, keep reference to the existing one
+ return currentTransactionStage;
+ }
+ // new transaction started, keep reference to it
+ return completedFuture( tx );
+ } );
- @Override
- @SuppressWarnings( "deprecation" )
- public void reset()
- {
- Futures.blockingGet( resetAsync(),
- () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
+ return newTransactionStage;
}
public CompletionStage resetAsync()
@@ -293,163 +148,74 @@ public CompletionStage resetAsync()
} );
}
- CompletionStage currentConnectionIsOpen()
- {
- return connectionStage.handle( ( connection, error ) ->
- error == null && // no acquisition error
- connection != null && // some connection has actually been acquired
- connection.isOpen() ); // and it's still open
- }
-
- private T transaction( AccessMode mode, TransactionWork work, TransactionConfig config )
+ public RetryLogic retryLogic()
{
- // use different code path compared to async so that work is executed in the caller thread
- // caller thread will also be the one who sleeps between retries;
- // it is unsafe to execute retries in the event loop threads because this can cause a deadlock
- // event loop thread will bock and wait for itself to read some data
- return retryLogic.retry( () ->
- {
- try ( Transaction tx = beginTransaction( mode, config ) )
- {
- try
- {
- T result = work.execute( tx );
- tx.success();
- return result;
- }
- catch ( Throwable t )
- {
- // mark transaction for failure if the given unit of work threw exception
- // this will override any success marks that were made by the unit of work
- tx.failure();
- throw t;
- }
- }
- } );
+ return retryLogic;
}
- private CompletionStage transactionAsync( AccessMode mode, AsyncTransactionWork> work, TransactionConfig config )
+ public String lastBookmark()
{
- return retryLogic.retryAsync( () ->
- {
- CompletableFuture resultFuture = new CompletableFuture<>();
- CompletionStage txFuture = beginTransactionAsync( mode, config );
-
- txFuture.whenComplete( ( tx, completionError ) ->
- {
- Throwable error = Futures.completionExceptionCause( completionError );
- if ( error != null )
- {
- resultFuture.completeExceptionally( error );
- }
- else
- {
- executeWork( resultFuture, tx, work );
- }
- } );
-
- return resultFuture;
- } );
+ return bookmarksHolder.lastBookmark();
}
- private void executeWork( CompletableFuture resultFuture, ExplicitTransaction tx,
- AsyncTransactionWork> work )
+ public CompletionStage releaseConnectionAsync()
{
- CompletionStage workFuture = safeExecuteWork( tx, work );
- workFuture.whenComplete( ( result, completionError ) ->
+ return connectionStage.thenCompose( connection ->
{
- Throwable error = Futures.completionExceptionCause( completionError );
- if ( error != null )
- {
- rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
- }
- else
+ if ( connection != null )
{
- closeTxAfterSucceededTransactionWork( tx, resultFuture, result );
+ // there exists connection, try to release it back to the pool
+ return connection.release();
}
+ // no connection so return null
+ return completedWithNull();
} );
}
- private CompletionStage safeExecuteWork( ExplicitTransaction tx, AsyncTransactionWork> work )
+ public CompletionStage connectionAsync()
{
- // given work might fail in both async and sync way
- // async failure will result in a failed future being returned
- // sync failure will result in an exception being thrown
- try
- {
- CompletionStage result = work.execute( tx );
-
- // protect from given transaction function returning null
- return result == null ? completedWithNull() : result;
- }
- catch ( Throwable workError )
- {
- // work threw an exception, wrap it in a future and proceed
- return failedFuture( workError );
- }
+ return connectionStage;
}
- private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, CompletableFuture resultFuture,
- Throwable error )
+ public boolean isOpen()
{
- if ( tx.isOpen() )
- {
- tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) ->
- {
- if ( rollbackError != null )
- {
- error.addSuppressed( rollbackError );
- }
- resultFuture.completeExceptionally( error );
- } );
- }
- else
- {
- resultFuture.completeExceptionally( error );
- }
+ return open.get();
}
- private void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, CompletableFuture resultFuture,
- T result )
+ public CompletionStage closeAsync()
{
- if ( tx.isOpen() )
+ if ( open.compareAndSet( true, false ) )
{
- tx.success();
- tx.closeAsync().whenComplete( ( ignore, completionError ) ->
+ return resultCursorStage.thenCompose( cursor ->
{
- Throwable commitError = Futures.completionExceptionCause( completionError );
- if ( commitError != null )
+ if ( cursor != null )
{
- resultFuture.completeExceptionally( commitError );
+ // there exists a cursor with potentially unconsumed error, try to extract and propagate it
+ return cursor.failureAsync();
}
- else
+ // no result cursor exists so no error exists
+ return completedWithNull();
+ } ).thenCompose( cursorError -> closeTransactionAndReleaseConnection().thenApply( txCloseError ->
+ {
+ // now we have cursor error, active transaction has been closed and connection has been released
+ // back to the pool; try to propagate cursor and transaction close errors, if any
+ CompletionException combinedError = Futures.combineErrors( cursorError, txCloseError );
+ if ( combinedError != null )
{
- resultFuture.complete( result );
+ throw combinedError;
}
- } );
- }
- else
- {
- resultFuture.complete( result );
+ return null;
+ } ) );
}
+ return completedWithNull();
}
- public CompletionStage runRx( Statement statement, TransactionConfig config )
- {
- CompletionStage newResultCursorStage =
- buildResultCursorFactory( statement, config, true ).thenCompose( StatementResultCursorFactory::rxResult );
-
- resultCursorStage = newResultCursorStage.exceptionally( error -> null );
- return newResultCursorStage;
- }
-
- private CompletionStage run( Statement statement, TransactionConfig config, boolean waitForRunResponse )
+ protected CompletionStage currentConnectionIsOpen()
{
- CompletionStage newResultCursorStage =
- buildResultCursorFactory( statement, config, waitForRunResponse ).thenCompose( StatementResultCursorFactory::asyncResult );
-
- resultCursorStage = newResultCursorStage.exceptionally( error -> null );
- return newResultCursorStage;
+ return connectionStage.handle( ( connection, error ) ->
+ error == null && // no acquisition error
+ connection != null && // some connection has actually been acquired
+ connection.isOpen() ); // and it's still open
}
private CompletionStage buildResultCursorFactory( Statement statement, TransactionConfig config, boolean waitForRunResponse )
@@ -472,44 +238,6 @@ private CompletionStage buildResultCursorFactory(
} );
}
- private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
- {
- return Futures.blockingGet( beginTransactionAsync( mode, config ),
- () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
- }
-
- private CompletionStage beginTransactionAsync( AccessMode mode, TransactionConfig config )
- {
- ensureSessionIsOpen();
-
- // create a chain that acquires connection and starts a transaction
- CompletionStage newTransactionStage = ensureNoOpenTxBeforeStartingTx()
- .thenCompose( ignore -> acquireConnection( databaseName, mode ) )
- .thenCompose( connection ->
- {
- ExplicitTransaction tx = new ExplicitTransaction( connection, bookmarksHolder );
- return tx.beginAsync( bookmarksHolder.getBookmarks(), config );
- } );
-
- // update the reference to the only known transaction
- CompletionStage currentTransactionStage = transactionStage;
-
- transactionStage = newTransactionStage
- .exceptionally( error -> null ) // ignore errors from starting new transaction
- .thenCompose( tx ->
- {
- if ( tx == null )
- {
- // failed to begin new transaction, keep reference to the existing one
- return currentTransactionStage;
- }
- // new transaction started, keep reference to it
- return completedFuture( tx );
- } );
-
- return newTransactionStage;
- }
-
private CompletionStage acquireConnection( String databaseName, AccessMode mode )
{
CompletionStage currentConnectionStage = connectionStage;
@@ -569,40 +297,7 @@ private CompletionStage closeTransactionAndReleaseConnection()
return completedWithNull();
} ).thenCompose( txCloseError ->
// then release the connection and propagate transaction close error, if any
- releaseConnection().thenApply( ignore -> txCloseError ) );
- }
-
- public CompletionStage releaseConnection()
- {
- return connectionStage.thenCompose( connection ->
- {
- if ( connection != null )
- {
- // there exists connection, try to release it back to the pool
- return connection.release();
- }
- // no connection so return null
- return completedWithNull();
- } );
- }
-
- private void terminateConnectionOnThreadInterrupt( String reason )
- {
- // try to get current connection if it has been acquired
- Connection connection = null;
- try
- {
- connection = Futures.getNow( connectionStage );
- }
- catch ( Throwable ignore )
- {
- // ignore errors because handing interruptions is best effort
- }
-
- if ( connection != null )
- {
- connection.terminateAndRelease( reason );
- }
+ releaseConnectionAsync().thenApply( ignore -> txCloseError ) );
}
private CompletionStage ensureNoOpenTxBeforeRunningQuery()
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
index 2353b13013..319f576fac 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
@@ -38,7 +38,7 @@ public void add( CompletionStage extends FailableCursor> cursorStage )
cursorStages.add( cursorStage );
}
- public CompletionStage retrieveNotConsumedError()
+ CompletionStage retrieveNotConsumedError()
{
CompletableFuture[] failures = retrieveAllFailures();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolUtil.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java
similarity index 98%
rename from driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolUtil.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java
index 1e9001b37a..8602180966 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolUtil.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.buffer.ByteBuf;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/BootstrapFactory.java
similarity index 96%
rename from driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/BootstrapFactory.java
index 73d355d222..3262fba331 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/BootstrapFactory.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java
similarity index 98%
rename from driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java
index 628c99d57f..784f82b645 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListener.java
similarity index 92%
rename from driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListener.java
index 164bd06f88..3a11804401 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListener.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -31,8 +31,8 @@
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import static java.lang.String.format;
-import static org.neo4j.driver.internal.async.BoltProtocolUtil.handshakeBuf;
-import static org.neo4j.driver.internal.async.BoltProtocolUtil.handshakeString;
+import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.handshakeBuf;
+import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.handshakeString;
public class ChannelConnectedListener implements ChannelFutureListener
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnector.java
similarity index 94%
rename from driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnector.java
index aaff221c92..6ebf0b42d3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnector.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java
similarity index 99%
rename from driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java
index cbd257991c..76bde81a0b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilder.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilder.java
similarity index 94%
rename from driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilder.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilder.java
index 4fb537e678..9c7c2d9726 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilder.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.ChannelPipeline;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilderImpl.java
similarity index 97%
rename from driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilderImpl.java
index ea7149e0a0..98535fdbef 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilderImpl.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.ChannelPipeline;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DecoratedConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DecoratedConnection.java
similarity index 98%
rename from driver/src/main/java/org/neo4j/driver/internal/async/DecoratedConnection.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/DecoratedConnection.java
index 5b52915795..29cacaed80 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/DecoratedConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DecoratedConnection.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import java.util.concurrent.CompletionStage;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DirectConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java
similarity index 98%
rename from driver/src/main/java/org/neo4j/driver/internal/async/DirectConnection.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java
index b50c5a1338..70820796d0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/DirectConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
@@ -40,7 +40,7 @@
import org.neo4j.driver.internal.util.ServerVersion;
import static java.util.Collections.emptyMap;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
public class DirectConnection implements Connection
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/EventLoopGroupFactory.java
similarity index 99%
rename from driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/EventLoopGroupFactory.java
index c4fb71531e..2592f10ed3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/EventLoopGroupFactory.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeCompletedListener.java
similarity index 97%
rename from driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeCompletedListener.java
index 72a14a90f9..a66c603b11 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeCompletedListener.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeHandler.java
similarity index 96%
rename from driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeHandler.java
index 637a0fe55d..cd1af387c7 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeHandler.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -37,8 +37,8 @@
import org.neo4j.driver.exceptions.SecurityException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
-import static org.neo4j.driver.internal.async.BoltProtocolUtil.HTTP;
-import static org.neo4j.driver.internal.async.BoltProtocolUtil.NO_PROTOCOL_VERSION;
+import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.HTTP;
+import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.NO_PROTOCOL_VERSION;
public class HandshakeHandler extends ReplayingDecoder
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyChannelInitializer.java
similarity index 90%
rename from driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyChannelInitializer.java
index 859d3f5c9e..74ec34be5a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyChannelInitializer.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -32,9 +32,9 @@
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.Logging;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setCreationTimestamp;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerAddress;
public class NettyChannelInitializer extends ChannelInitializer
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java
similarity index 98%
rename from driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java
index a6531629a5..16528529a3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.async.connection;
import java.util.concurrent.CompletionStage;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java
index 3f1fc146c6..c80345df4d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java
@@ -31,8 +31,8 @@
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import static java.util.Objects.requireNonNull;
-import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
-import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.terminationReason;
public class ChannelErrorHandler extends ChannelInboundHandlerAdapter
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java
index 842e4373d3..5bd30ae175 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java
@@ -30,7 +30,7 @@
import static io.netty.buffer.ByteBufUtil.hexDump;
import static java.util.Objects.requireNonNull;
-import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
public class InboundMessageHandler extends SimpleChannelInboundHandler
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java
index 79d31cb6be..d65d944ce1 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java
@@ -20,12 +20,12 @@
import io.netty.buffer.ByteBuf;
-import org.neo4j.driver.internal.async.BoltProtocolUtil;
+import org.neo4j.driver.internal.async.connection.BoltProtocolUtil;
import org.neo4j.driver.internal.packstream.PackOutput;
import static java.util.Objects.requireNonNull;
-import static org.neo4j.driver.internal.async.BoltProtocolUtil.CHUNK_HEADER_SIZE_BYTES;
-import static org.neo4j.driver.internal.async.BoltProtocolUtil.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
+import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.CHUNK_HEADER_SIZE_BYTES;
+import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
public class ChunkAwareByteBufOutput implements PackOutput
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java
index 44b1e70436..97c189fd6f 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java
@@ -25,7 +25,7 @@
import java.util.List;
-import org.neo4j.driver.internal.async.BoltProtocolUtil;
+import org.neo4j.driver.internal.async.connection.BoltProtocolUtil;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.MessageFormat;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
index 709c348c6e..8a1dd9be72 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
@@ -34,8 +34,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.async.ChannelConnector;
-import org.neo4j.driver.internal.async.DirectConnection;
+import org.neo4j.driver.internal.async.connection.ChannelConnector;
+import org.neo4j.driver.internal.async.connection.DirectConnection;
import org.neo4j.driver.internal.metrics.ListenerEvent;
import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.spi.Connection;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java
index 2d18fd998a..12fb6a465b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java
@@ -29,9 +29,9 @@
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
-import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp;
-import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp;
-import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.creationTimestamp;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.lastUsedTimestamp;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
public class NettyChannelHealthChecker implements ChannelHealthChecker
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java
index 8ca45ff774..8168c09fda 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java
@@ -25,7 +25,7 @@
import io.netty.channel.pool.FixedChannelPool;
import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.async.ChannelConnector;
+import org.neo4j.driver.internal.async.connection.ChannelConnector;
import org.neo4j.driver.internal.metrics.ListenerEvent;
import static java.util.Objects.requireNonNull;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java
index d0f789dccc..949b604965 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java
@@ -36,7 +36,7 @@
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
-import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress;
public class NettyChannelTracker implements ChannelPoolHandler
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java
index 4b86a751d0..db688e9918 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java
@@ -24,7 +24,7 @@
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.internal.BookmarksHolder;
-import org.neo4j.driver.internal.async.DecoratedConnection;
+import org.neo4j.driver.internal.async.connection.DecoratedConnection;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.ServerVersion;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
index 466679f052..93adf8fd6b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
@@ -25,8 +25,8 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.RoutingErrorHandler;
-import org.neo4j.driver.internal.async.DecoratedConnection;
-import org.neo4j.driver.internal.async.RoutingConnection;
+import org.neo4j.driver.internal.async.connection.DecoratedConnection;
+import org.neo4j.driver.internal.async.connection.RoutingConnection;
import org.neo4j.driver.internal.cluster.AddressSet;
import org.neo4j.driver.internal.cluster.ClusterComposition;
import org.neo4j.driver.internal.cluster.ClusterCompositionProvider;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/AsyncResultCursorOnlyFactory.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java
similarity index 96%
rename from driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/AsyncResultCursorOnlyFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java
index 4a4f2d1b62..6d657afc7e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/AsyncResultCursorOnlyFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java
@@ -16,11 +16,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.reactive.cursor;
+package org.neo4j.driver.internal.cursor;
import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.internal.AsyncStatementResultCursor;
+import org.neo4j.driver.internal.async.AsyncStatementResultCursor;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.messaging.Message;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java
similarity index 94%
rename from driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/InternalStatementResultCursor.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java
index e17ac81bbe..4560931daf 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/InternalStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.reactive.cursor;
+package org.neo4j.driver.internal.cursor;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.async.StatementResultCursor;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/InternalStatementResultCursorFactory.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java
similarity index 96%
rename from driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/InternalStatementResultCursorFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java
index 5ddf09941d..ab4fce646b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/InternalStatementResultCursorFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java
@@ -16,11 +16,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.reactive.cursor;
+package org.neo4j.driver.internal.cursor;
import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.internal.AsyncStatementResultCursor;
+import org.neo4j.driver.internal.async.AsyncStatementResultCursor;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/RxStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java
similarity index 98%
rename from driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/RxStatementResultCursor.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java
index 9eff8a906d..788d419b91 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/RxStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.reactive.cursor;
+package org.neo4j.driver.internal.cursor;
import org.reactivestreams.Subscription;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/StatementResultCursorFactory.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java
similarity index 94%
rename from driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/StatementResultCursorFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java
index 6ab84663a9..200d362925 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/cursor/StatementResultCursorFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.reactive.cursor;
+package org.neo4j.driver.internal.cursor;
import java.util.concurrent.CompletionStage;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java
index 6d61bfb9f6..d96dfc4bf3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java
@@ -27,7 +27,7 @@
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.util.Clock;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java
index 13ca717886..ec088f9304 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java
@@ -27,8 +27,8 @@
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.Value;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setConnectionId;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerVersion;
import static org.neo4j.driver.internal.util.MetadataExtractor.extractNeo4jServerVersion;
public class HelloResponseHandler implements ResponseHandler
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java
index 4e6bb9dfcf..f03414bf3d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/InitResponseHandler.java
@@ -29,7 +29,7 @@
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.Value;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerVersion;
import static org.neo4j.driver.internal.util.MetadataExtractor.extractNeo4jServerVersion;
public class InitResponseHandler implements ResponseHandler
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java
index a3b6f2a120..fef1eac382 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java
@@ -18,15 +18,15 @@
*/
package org.neo4j.driver.internal.handlers;
+import org.neo4j.driver.Statement;
import org.neo4j.driver.internal.BookmarksHolder;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.SessionPullResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.TransactionPullResponseHandler;
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.Statement;
public class PullHandlers
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java
index b9b7e8843e..c9ee224487 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java
@@ -20,11 +20,11 @@
import java.util.Map;
-import org.neo4j.driver.internal.ExplicitTransaction;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.Statement;
import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.spi.Connection;
+import org.neo4j.driver.internal.util.MetadataExtractor;
import static java.util.Objects.requireNonNull;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java
index eb253abb99..5944d850cb 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java
@@ -20,12 +20,12 @@
import java.util.Map;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.MetadataExtractor;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Value;
import static java.util.Objects.requireNonNull;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/logging/ChannelActivityLogger.java b/driver/src/main/java/org/neo4j/driver/internal/logging/ChannelActivityLogger.java
index 59f4dac4f2..d45b9e47a3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/logging/ChannelActivityLogger.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/logging/ChannelActivityLogger.java
@@ -21,7 +21,7 @@
import io.netty.channel.Channel;
import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.async.ChannelAttributes;
+import org.neo4j.driver.internal.async.connection.ChannelAttributes;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java
index 4cd63a1fbe..c34b1c0d82 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java
@@ -24,23 +24,23 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.Transaction;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
import org.neo4j.driver.internal.messaging.v2.BoltProtocolV2;
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.reactive.cursor.StatementResultCursorFactory;
-import org.neo4j.driver.Session;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Transaction;
-import org.neo4j.driver.TransactionConfig;
-import org.neo4j.driver.Value;
-import org.neo4j.driver.exceptions.ClientException;
-import static org.neo4j.driver.internal.async.ChannelAttributes.protocolVersion;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.protocolVersion;
public interface BoltProtocol
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java
index add6cfcff5..b4d140c0e5 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java
@@ -31,7 +31,9 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
+import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.handlers.AbstractPullAllResponseHandler;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
@@ -46,15 +48,13 @@
import org.neo4j.driver.internal.messaging.request.InitMessage;
import org.neo4j.driver.internal.messaging.request.PullAllMessage;
import org.neo4j.driver.internal.messaging.request.RunMessage;
-import org.neo4j.driver.internal.reactive.cursor.AsyncResultCursorOnlyFactory;
-import org.neo4j.driver.internal.reactive.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
import static org.neo4j.driver.Values.ofValue;
-import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.assertEmptyDatabaseName;
public class BoltProtocolV1 implements BoltProtocol
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java
index fa24cd708a..1fa439f571 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java
@@ -30,7 +30,9 @@
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
+import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.handlers.AbstractPullAllResponseHandler;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
@@ -44,13 +46,11 @@
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
import org.neo4j.driver.internal.messaging.request.HelloMessage;
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
-import org.neo4j.driver.internal.reactive.cursor.AsyncResultCursorOnlyFactory;
-import org.neo4j.driver.internal.reactive.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
-import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV3PullAllHandler;
import static org.neo4j.driver.internal.messaging.request.CommitMessage.COMMIT;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.assertEmptyDatabaseName;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java
index 68f86ca5b4..9a2b33b20d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java
@@ -18,11 +18,11 @@
*/
package org.neo4j.driver.internal.messaging.v4;
-import java.util.concurrent.CompletableFuture;
-
import org.neo4j.driver.Statement;
import org.neo4j.driver.internal.BookmarksHolder;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.cursor.InternalStatementResultCursorFactory;
+import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.handlers.AbstractPullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
@@ -30,8 +30,6 @@
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
-import org.neo4j.driver.internal.reactive.cursor.InternalStatementResultCursorFactory;
-import org.neo4j.driver.internal.reactive.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.spi.Connection;
import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV3PullAllHandler;
@@ -52,9 +50,7 @@ public MessageFormat createMessageFormat()
protected StatementResultCursorFactory buildResultCursorFactory( Connection connection, Statement statement, BookmarksHolder bookmarksHolder,
ExplicitTransaction tx, RunWithMetadataMessage runMessage, boolean waitForRunResponse )
{
- CompletableFuture runCompletedFuture = new CompletableFuture<>();
-
- RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, METADATA_EXTRACTOR );
+ RunResponseHandler runHandler = new RunResponseHandler( METADATA_EXTRACTOR );
AbstractPullAllResponseHandler pullAllHandler = newBoltV3PullAllHandler( statement, runHandler, connection, bookmarksHolder, tx );
BasicPullResponseHandler pullHandler = newBoltV4PullHandler( statement, runHandler, connection, bookmarksHolder, tx );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java
index 006a1e3434..0d5e9288c1 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java
@@ -21,7 +21,7 @@
import java.util.concurrent.TimeUnit;
import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.async.DirectConnection;
+import org.neo4j.driver.internal.async.connection.DirectConnection;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.Config;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java
index 7e136a56d5..d24fa9b45d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxResult.java
@@ -26,9 +26,9 @@
import java.util.function.Supplier;
import org.neo4j.driver.Record;
-import org.neo4j.driver.internal.reactive.cursor.RxStatementResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
+import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
import org.neo4j.driver.summary.ResultSummary;
public class InternalRxResult implements RxResult
@@ -44,7 +44,8 @@ public InternalRxResult( Supplier> curs
@Override
public Publisher keys()
{
- return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() ).flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) );
+ return Flux.defer( () -> Mono.fromCompletionStage( getCursorFuture() )
+ .flatMapIterable( RxStatementResultCursor::keys ).onErrorMap( Futures::completionExceptionCause ) );
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java
index 0948599c11..fb1fdb4649 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java
@@ -20,23 +20,20 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.neo4j.driver.internal.ExplicitTransaction;
-import org.neo4j.driver.internal.NetworkSession;
+import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.internal.async.NetworkSession;
+import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.reactive.RxTransactionWork;
-import org.neo4j.driver.internal.reactive.cursor.RxStatementResultCursor;
-import org.neo4j.driver.AccessMode;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.TransactionConfig;
-import org.neo4j.driver.exceptions.TransientException;
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
import static org.neo4j.driver.internal.reactive.RxUtils.createMono;
@@ -68,7 +65,26 @@ public Publisher beginTransaction( TransactionConfig config )
session.beginTransactionAsync( config ).whenComplete( ( tx, completionError ) -> {
if ( tx != null )
{
- txFuture.complete( new InternalRxTransaction( (ExplicitTransaction) tx ) );
+ txFuture.complete( new InternalRxTransaction( tx ) );
+ }
+ else
+ {
+ releaseConnectionBeforeReturning( txFuture, completionError );
+ }
+ } );
+ return txFuture;
+ } );
+ }
+
+ private Publisher beginTransaction( AccessMode mode, TransactionConfig config )
+ {
+ return createMono( () ->
+ {
+ CompletableFuture txFuture = new CompletableFuture<>();
+ session.beginTransactionAsync( mode, config ).whenComplete( ( tx, completionError ) -> {
+ if ( tx != null )
+ {
+ txFuture.complete( new InternalRxTransaction( tx ) );
}
else
{
@@ -105,21 +121,8 @@ public Publisher writeTransaction( RxTransactionWork> work,
private Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config )
{
- // TODO read and write
- Publisher publisher = beginTransaction( config );
- Flux txExecutor = Mono.from( publisher ).flatMapMany( tx -> Flux.from( work.execute( tx ) ).flatMap( t -> Mono.create( sink -> sink.success( t ) ),
- throwable -> Mono.from( tx.rollback() ).then( Mono.error( throwable ) ), // TODO chain errors from rollback to throwable
- () -> Mono.from( tx.commit() ).then( Mono.empty() ) ) );
- return txExecutor.retry( throwable -> {
- if ( throwable instanceof TransientException )
- {
- return true;
- }
- else
- {
- return false;
- }
- } ); // TODO retry
+ Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, RxTransaction::commit, RxTransaction::rollback );
+ return session.retryLogic().retryRx( repeatableWork );
}
@Override
@@ -167,7 +170,7 @@ private void releaseConnectionBeforeReturning( CompletableFuture returnFu
// The reason we need to release connection in session is that we do not have a `rxSession.close()`;
// Otherwise, session.close shall handle everything for us.
Throwable error = Futures.completionExceptionCause( completionError );
- session.releaseConnection().whenComplete( ( ignored, closeError ) ->
+ session.releaseConnectionAsync().whenComplete( ( ignored, closeError ) ->
returnFuture.completeExceptionally( Futures.combineErrors( error, closeError ) ) );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java
index d869601b81..7a5322a477 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java
@@ -22,22 +22,22 @@
import java.util.concurrent.CompletableFuture;
-import org.neo4j.driver.internal.ExplicitTransaction;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;
-import org.neo4j.driver.internal.reactive.cursor.RxStatementResultCursor;
-import org.neo4j.driver.Statement;
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
public class InternalRxTransaction extends AbstractRxStatementRunner implements RxTransaction
{
- private final ExplicitTransaction asyncTx;
+ private final ExplicitTransaction tx;
- public InternalRxTransaction( ExplicitTransaction asyncTx )
+ public InternalRxTransaction( ExplicitTransaction tx )
{
- this.asyncTx = asyncTx;
+ this.tx = tx;
}
@Override
@@ -45,7 +45,7 @@ public RxResult run( Statement statement )
{
return new InternalRxResult( () -> {
CompletableFuture cursorFuture = new CompletableFuture<>();
- asyncTx.runRx( statement ).whenComplete( ( cursor, completionError ) -> {
+ tx.runRx( statement ).whenComplete( ( cursor, completionError ) -> {
if ( cursor != null )
{
cursorFuture.complete( cursor );
@@ -54,9 +54,9 @@ public RxResult run( Statement statement )
{
// We failed to create a result cursor so we cannot rely on result cursor to handle failure.
// The logic here shall be the same as `TransactionPullResponseHandler#afterFailure` as that is where cursor handling failure
- // This is optional as asyncTx still holds a reference to all cursor futures and they will be clean up properly in commit
+ // This is optional as tx still holds a reference to all cursor futures and they will be clean up properly in commit
Throwable error = Futures.completionExceptionCause( completionError );
- asyncTx.markTerminated();
+ tx.markTerminated();
cursorFuture.completeExceptionally( error );
}
} );
@@ -81,11 +81,11 @@ private Publisher close( boolean commit )
return createEmptyPublisher( () -> {
if ( commit )
{
- return asyncTx.commitAsync();
+ return tx.commitAsync();
}
else
{
- return asyncTx.rollbackAsync();
+ return tx.rollbackAsync();
}
} );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java
index d48f653f19..6791d66244 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java
@@ -20,22 +20,30 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
-
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.context.Context;
+import reactor.util.function.Tuples;
+
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Supplier;
-import org.neo4j.driver.internal.util.Clock;
-import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
+import org.neo4j.driver.internal.util.Clock;
+import org.neo4j.driver.internal.util.Futures;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -128,6 +136,12 @@ public CompletionStage retryAsync( Supplier> work )
return resultFuture;
}
+ @Override
+ public Publisher retryRx( Publisher work )
+ {
+ return Flux.from( work ).retryWhen( retryRxCondition() );
+ }
+
protected boolean canRetryOn( Throwable error )
{
return error instanceof SessionExpiredException ||
@@ -135,6 +149,52 @@ protected boolean canRetryOn( Throwable error )
isTransientError( error );
}
+ private Function,Publisher> retryRxCondition()
+ {
+ return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> {
+ Throwable lastError = t2.getT1();
+ Context ctx = t2.getT2();
+
+ List errors = ctx.getOrDefault( "errors", null );
+
+ long startTime = ctx.getOrDefault( "startTime", -1L );
+ long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs );
+
+ if( !canRetryOn( lastError ) )
+ {
+ addSuppressed( lastError, errors );
+ return Mono.error( lastError );
+ }
+
+ long currentTime = clock.millis();
+ if ( startTime == -1 )
+ {
+ startTime = currentTime;
+ }
+
+ long elapsedTime = currentTime - startTime;
+ if ( elapsedTime < maxRetryTimeMs )
+ {
+ long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
+ log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", lastError );
+
+ nextDelayMs = (long) (nextDelayMs * multiplier);
+ errors = recordError( lastError, errors );
+
+ // retry on netty event loop thread
+ EventExecutor eventExecutor = eventExecutorGroup.next();
+ return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) )
+ .delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
+ }
+ else
+ {
+ addSuppressed( lastError, errors );
+
+ return Mono.error( lastError );
+ }
+ } );
+ }
+
private void executeWorkInEventLoop( CompletableFuture resultFuture, Supplier> work )
{
// this is the very first time we execute given work
diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java
index d5ba6956f7..332626d1d3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java
@@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.retry;
+import org.reactivestreams.Publisher;
+
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
@@ -26,4 +28,6 @@ public interface RetryLogic
T retry( Supplier work );
CompletionStage retryAsync( Supplier> work );
+
+ Publisher retryRx( Publisher work );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java
index d5d14b3165..e0a50d4a6b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java
@@ -26,7 +26,7 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import org.neo4j.driver.internal.async.EventLoopGroupFactory;
+import org.neo4j.driver.internal.async.connection.EventLoopGroupFactory;
import static java.util.concurrent.CompletableFuture.completedFuture;
diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java b/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java
index 231f0aae1c..2ccf52a9ee 100644
--- a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java
+++ b/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java
@@ -46,8 +46,8 @@ public interface RxResult
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and get executed.
* This method does not start the record streaming nor publish query execution error.
* To retrieve the execution result, either {@link #records()} or {@link #summary()} can be used.
- * {@link #records()} starts record streaming and report query execution error.
- * {@link #summary()} skips record streaming and directly report query execution error.
+ * {@link #records()} starts record streaming and reports query execution error.
+ * {@link #summary()} skips record streaming and directly reports query execution error.
*
* Consuming of execution result ensures the resources (such as network connections) used by this result is freed correctly.
* Consuming the keys without consuming the execution result will result in resource leak.
@@ -66,7 +66,7 @@ public interface RxResult
*
* When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed},
* the query statement is executed and the query result is streamed back as a record stream followed by a result summary.
- * This record publisher publishes all records in the result and signal the completion.
+ * This record publisher publishes all records in the result and signals the completion.
* However before completion or error reporting if any, a cleanup of result resources such as network connection will be carried out automatically.
*
* Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error)
@@ -77,13 +77,13 @@ public interface RxResult
* But it will not cancel the query execution.
* As a result, a termination signal (complete or error) will still be sent to the {@link Subscriber} after the query execution is finished.
*
- * The record publishing event by default runs in Netty IO thread, as a result no blocking operation is allowed in this thread.
+ * The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread.
* Otherwise network IO might be blocked by application logic.
*
* This publisher can only be subscribed by one {@link Subscriber} once.
*
- * If this publisher is subscribed after {@link #keys()}, then the publish of records is carried out once each record arrives.
- * If this publisher is subscribed after {@link #summary()}, then the publish of records has been cancelled
+ * If this publisher is subscribed after {@link #keys()}, then the publish of records is carried out after the arrival of keys.
+ * If this publisher is subscribed after {@link #summary()}, then the publish of records is already cancelled
* and an empty publisher of zero record will be return.
* @return a cold unicast publisher of records.
*/
@@ -94,7 +94,7 @@ public interface RxResult
*
* {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary returned.
* The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion.
- * As a result, the invocation of {@link #records()} after this method, would receive a empty publisher.
+ * As a result, the invocation of {@link #records()} after this method, would receive an empty publisher.
*
* If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client.
* If subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records.
diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java
index 2879860eae..9c04f060f0 100644
--- a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java
+++ b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java
@@ -21,7 +21,9 @@
import org.reactivestreams.Publisher;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Session;
import org.neo4j.driver.Statement;
import org.neo4j.driver.TransactionConfig;
@@ -43,7 +45,7 @@ public interface RxSession extends RxStatementRunner
* maintain multiple concurrent transactions, use multiple concurrent
* sessions.
*
- * It by default is executed in Netty IO thread, as a result no blocking operation is allowed in this thread.
+ * It by default is executed in a Network IO thread, as a result no blocking operation is allowed in this thread.
*
* @return a new {@link RxTransaction}
*/
@@ -52,22 +54,99 @@ public interface RxSession extends RxStatementRunner
/**
* Begin a new explicit {@linkplain RxTransaction transaction} with the specified {@link TransactionConfig configuration}.
* At most one transaction may exist in a session at any point in time. To
- * maintain multiple concurrent transactions, use multiple concurrent
- * sessions.
+ * maintain multiple concurrent transactions, use multiple concurrent sessions.
*
- * It by default is executed in Netty IO thread, as a result no blocking operation is allowed in this thread.
+ * It by default is executed in a Network IO thread, as a result no blocking operation is allowed in this thread.
*
* @param config configuration for the new transaction.
* @return a new {@link RxTransaction}
*/
Publisher beginTransaction( TransactionConfig config );
+ /**
+ * Execute given unit of reactive work in a {@link AccessMode#READ read} reactive transaction.
+
+ * Transaction will automatically be committed unless given unit of work fails or
+ * {@link RxTransaction#commit() transaction commit} fails.
+ * It will also not be committed if explicitly rolled back via {@link RxTransaction#rollback()}.
+ *
+ * Returned publisher and given {@link RxTransactionWork} is completed/executed by an IO thread which should never block.
+ * Otherwise IO operations on this and potentially other network connections might deadlock.
+ * Please do not chain blocking operations like {@link CompletableFuture#get()} on the returned publisher and do not use them inside the
+ * {@link RxTransactionWork}.
+ *
+ * @param work the {@link RxTransactionWork} to be applied to a new read transaction.
+ * Operation executed by the given work must NOT include any blocking operation.
+ * @param the return type of the given unit of work.
+ * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work.
+ * publisher can be completed exceptionally if given work or commit fails.
+ *
+ */
Publisher readTransaction( RxTransactionWork> work );
+ /**
+ * Execute given unit of reactive work in a {@link AccessMode#READ read} reactive transaction with
+ * the specified {@link TransactionConfig configuration}.
+
+ * Transaction will automatically be committed unless given unit of work fails or
+ * {@link RxTransaction#commit() transaction commit} fails.
+ * It will also not be committed if explicitly rolled back via {@link RxTransaction#rollback()}.
+ *
+ * Returned publisher and given {@link RxTransactionWork} is completed/executed by an IO thread which should never block.
+ * Otherwise IO operations on this and potentially other network connections might deadlock.
+ * Please do not chain blocking operations like {@link CompletableFuture#get()} on the returned publisher and do not use them inside the
+ * {@link RxTransactionWork}.
+ *
+ * @param work the {@link RxTransactionWork} to be applied to a new read transaction.
+ * Operation executed by the given work must NOT include any blocking operation.
+ * @param the return type of the given unit of work.
+ * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work.
+ * publisher can be completed exceptionally if given work or commit fails.
+ *
+ */
Publisher readTransaction( RxTransactionWork> work, TransactionConfig config );
+ /**
+ * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction.
+
+ * Transaction will automatically be committed unless given unit of work fails or
+ * {@link RxTransaction#commit() transaction commit} fails.
+ * It will also not be committed if explicitly rolled back via {@link RxTransaction#rollback()}.
+ *
+ * Returned publisher and given {@link RxTransactionWork} is completed/executed by an IO thread which should never block.
+ * Otherwise IO operations on this and potentially other network connections might deadlock.
+ * Please do not chain blocking operations like {@link CompletableFuture#get()} on the returned publisher and do not use them inside the
+ * {@link RxTransactionWork}.
+ *
+ * @param work the {@link RxTransactionWork} to be applied to a new read transaction.
+ * Operation executed by the given work must NOT include any blocking operation.
+ * @param the return type of the given unit of work.
+ * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work.
+ * publisher can be completed exceptionally if given work or commit fails.
+ *
+ */
Publisher writeTransaction( RxTransactionWork> work );
+ /**
+ * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction with
+ * the specified {@link TransactionConfig configuration}.
+
+ * Transaction will automatically be committed unless given unit of work fails or
+ * {@link RxTransaction#commit() transaction commit} fails.
+ * It will also not be committed if explicitly rolled back via {@link RxTransaction#rollback()}.
+ *
+ * Returned publisher and given {@link RxTransactionWork} is completed/executed by an IO thread which should never block.
+ * Otherwise IO operations on this and potentially other network connections might deadlock.
+ * Please do not chain blocking operations like {@link CompletableFuture#get()} on the returned publisher and do not use them inside the
+ * {@link RxTransactionWork}.
+ *
+ * @param work the {@link RxTransactionWork} to be applied to a new read transaction.
+ * Operation executed by the given work must NOT include any blocking operation.
+ * @param the return type of the given unit of work.
+ * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work.
+ * publisher can be completed exceptionally if given work or commit fails.
+ *
+ */
Publisher writeTransaction( RxTransactionWork> work, TransactionConfig config );
/**
diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java
index 9cda2a246d..449607c7b3 100644
--- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java
+++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java
@@ -18,7 +18,21 @@
*/
package org.neo4j.driver.reactive;
+/**
+ * Callback that executes operations against a given {@link RxTransaction}.
+ * To be used with {@link RxSession#readTransaction(RxTransactionWork)} and
+ * {@link RxSession#writeTransaction(RxTransactionWork)} methods.
+ *
+ * @param the return type of this work.
+ * @since 2.0
+ */
public interface RxTransactionWork
{
+ /**
+ * Executes all given operations against the same transaction.
+ *
+ * @param tx the transaction to use.
+ * @return some result object or {@code null} if none.
+ */
T execute( RxTransaction tx );
}
diff --git a/driver/src/test/java/org/neo4j/driver/ParametersTest.java b/driver/src/test/java/org/neo4j/driver/ParametersTest.java
index 7dff56e929..ead3036281 100644
--- a/driver/src/test/java/org/neo4j/driver/ParametersTest.java
+++ b/driver/src/test/java/org/neo4j/driver/ParametersTest.java
@@ -24,12 +24,13 @@
import java.util.stream.Stream;
+import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.DefaultBookmarksHolder;
import org.neo4j.driver.internal.InternalRecord;
-import org.neo4j.driver.internal.NetworkSession;
+import org.neo4j.driver.internal.InternalSession;
+import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
-import org.neo4j.driver.exceptions.ClientException;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@@ -39,12 +40,12 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.mock;
+import static org.neo4j.driver.Values.parameters;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
import static org.neo4j.driver.internal.util.ValueFactory.emptyNodeValue;
import static org.neo4j.driver.internal.util.ValueFactory.emptyRelationshipValue;
import static org.neo4j.driver.internal.util.ValueFactory.filledPathValue;
-import static org.neo4j.driver.Values.parameters;
class ParametersTest
{
@@ -107,6 +108,8 @@ private Session mockedSession()
{
ConnectionProvider provider = mock( ConnectionProvider.class );
RetryLogic retryLogic = mock( RetryLogic.class );
- return new NetworkSession( provider, retryLogic, ABSENT_DB_NAME, AccessMode.WRITE, new DefaultBookmarksHolder(), DEV_NULL_LOGGING );
+ NetworkSession session =
+ new NetworkSession( provider, retryLogic, ABSENT_DB_NAME, AccessMode.WRITE, new DefaultBookmarksHolder(), DEV_NULL_LOGGING );
+ return new InternalSession( session );
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/integration/AsyncSessionIT.java
similarity index 99%
rename from driver/src/test/java/org/neo4j/driver/integration/SessionAsyncIT.java
rename to driver/src/test/java/org/neo4j/driver/integration/AsyncSessionIT.java
index 221328ad51..2c5bb64efe 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/SessionAsyncIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/AsyncSessionIT.java
@@ -83,7 +83,7 @@
import static org.neo4j.driver.util.TestUtil.awaitAll;
@ParallelizableIT
-class SessionAsyncIT
+class AsyncSessionIT
{
@RegisterExtension
static final DatabaseExtension neo4j = new DatabaseExtension();
diff --git a/driver/src/test/java/org/neo4j/driver/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/integration/AsyncTransactionIT.java
similarity index 99%
rename from driver/src/test/java/org/neo4j/driver/integration/TransactionAsyncIT.java
rename to driver/src/test/java/org/neo4j/driver/integration/AsyncTransactionIT.java
index beacf7d9bc..6faca07fab 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/TransactionAsyncIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/AsyncTransactionIT.java
@@ -42,7 +42,6 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
-import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.summary.StatementType;
import org.neo4j.driver.types.Node;
@@ -69,7 +68,7 @@
import static org.neo4j.driver.util.TestUtil.await;
@ParallelizableIT
-class TransactionAsyncIT
+class AsyncTransactionIT
{
@RegisterExtension
static final DatabaseExtension neo4j = new DatabaseExtension();
diff --git a/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java b/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java
index a1b776ae8e..83e54d0665 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java
@@ -28,8 +28,6 @@
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.exceptions.ClientException;
-import org.neo4j.driver.exceptions.TransientException;
-import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.util.ParallelizableIT;
import org.neo4j.driver.util.SessionExtension;
@@ -82,16 +80,6 @@ void shouldThrowForInvalidBookmark()
}
}
- @SuppressWarnings( "deprecation" )
- @Test
- void shouldThrowForUnreachableBookmark()
- {
- createNodeInTx( session );
-
- TransientException e = assertThrows( TransientException.class, () -> session.beginTransaction( session.lastBookmark() + 42 ) );
- assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) );
- }
-
@Test
void bookmarkRemainsAfterRolledBackTx()
{
diff --git a/driver/src/test/java/org/neo4j/driver/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/integration/CausalClusteringIT.java
index ddaa10fb0a..c38deecc1f 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/CausalClusteringIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/CausalClusteringIT.java
@@ -56,7 +56,6 @@
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
-import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.DisabledOnNeo4jWith;
@@ -77,7 +76,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.junit.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -317,30 +315,6 @@ void beginTransactionThrowsForInvalidBookmark()
}
}
- @SuppressWarnings( "deprecation" )
- @Test
- void beginTransactionThrowsForUnreachableBookmark()
- {
- ClusterMember leader = clusterRule.getCluster().leader();
-
- try ( Driver driver = createDriver( leader.getBoltUri() );
- Session session = driver.session() )
- {
- try ( Transaction tx = session.beginTransaction() )
- {
- tx.run( "CREATE ()" );
- tx.success();
- }
-
- String bookmark = session.lastBookmark();
- assertNotNull( bookmark );
- String newBookmark = bookmark + "0";
-
- TransientException e = assertThrows( TransientException.class, () -> session.beginTransaction( newBookmark ) );
- assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) );
- }
- }
-
@Test
void shouldHandleGracefulLeaderSwitch() throws Exception
{
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java
index d85b9ac05f..863394a2ff 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java
@@ -36,7 +36,7 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.DriverFactory;
-import org.neo4j.driver.internal.async.ChannelConnector;
+import org.neo4j.driver.internal.async.connection.ChannelConnector;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.RoutingSettings;
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java
index 054ad28529..d51afb8bc2 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java
@@ -31,15 +31,15 @@
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
-import org.neo4j.driver.Session;
-import org.neo4j.driver.Transaction;
-import org.neo4j.driver.async.AsyncSession;
-import org.neo4j.driver.async.AsyncTransaction;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
-import org.neo4j.driver.internal.ExplicitTransaction;
-import org.neo4j.driver.internal.async.EventLoopGroupFactory;
+import org.neo4j.driver.internal.InternalDriver;
+import org.neo4j.driver.internal.SessionParameters;
+import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.Clock;
@@ -47,7 +47,6 @@
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.junit.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,13 +56,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.neo4j.driver.Values.parameters;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
-import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError;
import static org.neo4j.driver.util.TestUtil.await;
-/**
- * We leave the question whether we want to let {@link ExplicitTransaction} both implements blocking tx and async tx or not later.
- * But as how it is right not, here are some tests for using mixes blocking and async API.
- */
@ParallelizableIT
class ExplicitTransactionIT
@@ -71,12 +65,12 @@ class ExplicitTransactionIT
@RegisterExtension
static final DatabaseExtension neo4j = new DatabaseExtension();
- private AsyncSession session;
+ private NetworkSession session;
@BeforeEach
void setUp()
{
- session = neo4j.driver().asyncSession();
+ session = ((InternalDriver) neo4j.driver()).newSession( SessionParameters.empty() );
}
@AfterEach
@@ -85,35 +79,55 @@ void tearDown()
session.closeAsync();
}
+ private ExplicitTransaction beginTransaction()
+ {
+ return beginTransaction( session );
+ }
+
+ private ExplicitTransaction beginTransaction( NetworkSession session )
+ {
+ return await( session.beginTransactionAsync( TransactionConfig.empty() ) );
+ }
+
+ private StatementResultCursor sessionRun( NetworkSession session, Statement statement )
+ {
+ return await( session.runAsync( statement, TransactionConfig.empty(), true ) );
+ }
+
+ private StatementResultCursor txRun( ExplicitTransaction tx, String statement )
+ {
+ return await( tx.runAsync( new Statement( statement ), true ) );
+ }
+
@Test
void shouldDoNothingWhenCommittedSecondTime()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
+ ExplicitTransaction tx = beginTransaction();
assertNull( await( tx.commitAsync() ) );
assertTrue( tx.commitAsync().toCompletableFuture().isDone() );
- assertFalse( ((ExplicitTransaction) tx).isOpen() );
+ assertFalse( tx.isOpen() );
}
@Test
void shouldFailToCommitAfterRollback()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
+ ExplicitTransaction tx = beginTransaction();
assertNull( await( tx.rollbackAsync() ) );
ClientException e = assertThrows( ClientException.class, () -> await( tx.commitAsync() ) );
assertEquals( "Can't commit, transaction has been rolled back", e.getMessage() );
- assertFalse( ((ExplicitTransaction) tx).isOpen() );
+ assertFalse( tx.isOpen() );
}
@Test
void shouldFailToCommitAfterTermination()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
+ ExplicitTransaction tx = beginTransaction();
- ((ExplicitTransaction) tx).markTerminated();
+ tx.markTerminated();
ClientException e = assertThrows( ClientException.class, () -> await( tx.commitAsync() ) );
assertThat( e.getMessage(), startsWith( "Transaction can't be committed" ) );
@@ -122,129 +136,89 @@ void shouldFailToCommitAfterTermination()
@Test
void shouldDoNothingWhenRolledBackSecondTime()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
+ ExplicitTransaction tx = beginTransaction();
assertNull( await( tx.rollbackAsync() ) );
assertTrue( tx.rollbackAsync().toCompletableFuture().isDone() );
- assertFalse( ((ExplicitTransaction) tx).isOpen() );
+ assertFalse( tx.isOpen() );
}
@Test
void shouldFailToRollbackAfterCommit()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
+ ExplicitTransaction tx = beginTransaction();
assertNull( await( tx.commitAsync() ) );
ClientException e = assertThrows( ClientException.class, () -> await( tx.rollbackAsync() ) );
assertEquals( "Can't rollback, transaction has been committed", e.getMessage() );
- assertFalse( ((ExplicitTransaction) tx).isOpen() );
+ assertFalse( tx.isOpen() );
}
@Test
void shouldRollbackAfterTermination()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
+ ExplicitTransaction tx = beginTransaction();
- ((ExplicitTransaction) tx).markTerminated();
+ tx.markTerminated();
assertNull( await( tx.rollbackAsync() ) );
- assertFalse( ((ExplicitTransaction) tx).isOpen() );
+ assertFalse( tx.isOpen() );
}
@Test
void shouldFailToRunQueryWhenMarkedForFailure()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
- tx.runAsync( "CREATE (:MyLabel)" );
- ((ExplicitTransaction) tx).failure();
+ ExplicitTransaction tx = beginTransaction();
+ txRun( tx, "CREATE (:MyLabel)" );
+ tx.failure();
- ClientException e = assertThrows( ClientException.class, () -> await( tx.runAsync( "CREATE (:MyOtherLabel)" ) ) );
+ ClientException e = assertThrows( ClientException.class, () -> txRun( tx, "CREATE (:MyOtherLabel)" ) );
assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) );
}
@Test
void shouldFailToRunQueryWhenTerminated()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
- tx.runAsync( "CREATE (:MyLabel)" );
- ((ExplicitTransaction) tx).markTerminated();
+ ExplicitTransaction tx = beginTransaction();
+ txRun( tx, "CREATE (:MyLabel)" );
+ tx.markTerminated();
- ClientException e = assertThrows( ClientException.class, () -> await( tx.runAsync( "CREATE (:MyOtherLabel)" ) ) );
+ ClientException e = assertThrows( ClientException.class, () -> txRun( tx, "CREATE (:MyOtherLabel)" ) );
assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) );
}
@Test
void shouldAllowQueriesWhenMarkedForSuccess()
{
- AsyncTransaction tx = await( session.beginTransactionAsync() );
- tx.runAsync( "CREATE (:MyLabel)" );
+ ExplicitTransaction tx = beginTransaction();
+ txRun( tx, "CREATE (:MyLabel)" );
- ((ExplicitTransaction) tx).success();
+ tx.success();
- tx.runAsync( "CREATE (:MyLabel)" );
+ txRun( tx, "CREATE (:MyLabel)" );
assertNull( await( tx.commitAsync() ) );
- StatementResultCursor cursor = await( session.runAsync( "MATCH (n:MyLabel) RETURN count(n)" ) );
+ StatementResultCursor cursor = sessionRun( session, new Statement( "MATCH (n:MyLabel) RETURN count(n)" ) );
assertEquals( 2, await( cursor.singleAsync() ).get( 0 ).asInt() );
}
- @Test
- void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction()
- {
- CompletionStage result = session.beginTransactionAsync()
- .thenApply( tx ->
- {
- if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
- {
- IllegalStateException e = assertThrows( IllegalStateException.class, () -> ((ExplicitTransaction) tx).run( "CREATE ()" ) );
- assertThat( e, is( blockingOperationInEventLoopError() ) );
- }
- return null;
- } );
-
- assertNull( await( result ) );
- }
-
- @Test
- void shouldAllowUsingBlockingApiInCommonPoolWhenChaining()
- {
- CompletionStage txStage = session.beginTransactionAsync()
- // move execution to ForkJoinPool.commonPool()
- .thenApplyAsync( asyncTx ->
- {
- Transaction tx = (ExplicitTransaction) asyncTx;
- tx.run( "UNWIND [1,1,2] AS x CREATE (:Node {id: x})" );
- tx.run( "CREATE (:Node {id: 42})" );
- tx.success();
- tx.close();
- return asyncTx;
- } );
-
- AsyncTransaction tx = await( txStage );
-
- assertFalse( ((ExplicitTransaction) tx).isOpen() );
- assertEquals( 2, countNodes( 1 ) );
- assertEquals( 1, countNodes( 2 ) );
- assertEquals( 1, countNodes( 42 ) );
- }
-
@Test
void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated()
{
- AsyncTransaction tx1 = await( session.beginTransactionAsync() );
- ((ExplicitTransaction) tx1).markTerminated();
+ ExplicitTransaction tx1 = beginTransaction();
+ tx1.markTerminated();
// commit should fail, make session forget about this transaction and release the connection to the pool
ClientException e = assertThrows( ClientException.class, () -> await( tx1.commitAsync() ) );
assertThat( e.getMessage(), startsWith( "Transaction can't be committed" ) );
- await( session.beginTransactionAsync()
- .thenCompose( tx -> tx.runAsync( "CREATE (:Node {id: 42})" )
+ await( session.beginTransactionAsync( TransactionConfig.empty() )
+ .thenCompose( tx -> tx.runAsync( new Statement( "CREATE (:Node {id: 42})" ), true )
.thenCompose( StatementResultCursor::consumeAsync )
.thenApply( ignore -> tx )
- ).thenCompose( AsyncTransaction::commitAsync ) );
+ ).thenCompose( ExplicitTransaction::commitAsync ) );
assertEquals( 1, countNodes( 42 ) );
}
@@ -263,7 +237,8 @@ void shouldPropagateRollbackFailureAfterFatalError()
private int countNodes( Object id )
{
- StatementResultCursor cursor = await( session.runAsync( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ) );
+ Statement statement = new Statement( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );
+ StatementResultCursor cursor = sessionRun( session, statement );
return await( cursor.singleAsync() ).get( 0 ).asInt();
}
@@ -274,12 +249,12 @@ private void testCommitAndRollbackFailurePropagation( boolean commit )
try ( Driver driver = driverFactory.newInstance( neo4j.uri(), neo4j.authToken(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) )
{
- try ( Session session = driver.session() )
+ NetworkSession session = ((InternalDriver) driver).newSession( SessionParameters.empty() );
{
- Transaction tx = session.beginTransaction();
+ ExplicitTransaction tx = beginTransaction( session );
// run query but do not consume the result
- tx.run( "UNWIND range(0, 10000) AS x RETURN x + 1" );
+ txRun( tx, "UNWIND range(0, 10000) AS x RETURN x + 1" );
IOException ioError = new IOException( "Connection reset by peer" );
for ( Channel channel : driverFactory.channels() )
@@ -290,8 +265,7 @@ private void testCommitAndRollbackFailurePropagation( boolean commit )
await( future );
}
- AsyncTransaction asyncTx = (ExplicitTransaction) tx;
- CompletionStage commitOrRollback = commit ? asyncTx.commitAsync() : asyncTx.rollbackAsync();
+ CompletionStage commitOrRollback = commit ? tx.commitAsync() : tx.rollbackAsync();
// commit/rollback should fail and propagate the network error
ServiceUnavailableException e = assertThrows( ServiceUnavailableException.class, () -> await( commitOrRollback ) );
diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java
index e3e5a319da..cddc6567e9 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java
@@ -19,6 +19,7 @@
package org.neo4j.driver.integration;
import io.netty.channel.Channel;
+import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -29,6 +30,14 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.StatementResult;
+import org.neo4j.driver.Transaction;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.async.AsyncSession;
+import org.neo4j.driver.async.StatementResultCursor;
+import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
@@ -36,16 +45,9 @@
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.MessageRecordingDriverFactory;
-import org.neo4j.driver.Driver;
-import org.neo4j.driver.Session;
-import org.neo4j.driver.StatementResult;
-import org.neo4j.driver.async.StatementResultCursor;
-import org.neo4j.driver.Transaction;
-import org.neo4j.driver.TransactionConfig;
-import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.summary.ResultSummary;
+import org.neo4j.driver.util.DriverExtension;
import org.neo4j.driver.util.ParallelizableIT;
-import org.neo4j.driver.util.SessionExtension;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
@@ -57,8 +59,8 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V3;
import static org.neo4j.driver.Config.defaultConfig;
+import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V3;
import static org.neo4j.driver.util.TestUtil.await;
@EnabledOnNeo4jWith( BOLT_V3 )
@@ -66,7 +68,7 @@
class SessionBoltV3IT
{
@RegisterExtension
- static final SessionExtension session = new SessionExtension();
+ static final DriverExtension driver = new DriverExtension();
@Test
void shouldSetTransactionMetadata()
@@ -81,7 +83,7 @@ void shouldSetTransactionMetadata()
.build();
// call listTransactions procedure that should list itself with the specified metadata
- StatementResult result = session.run( "CALL dbms.listTransactions()", config );
+ StatementResult result = driver.session().run( "CALL dbms.listTransactions()", config );
Map receivedMetadata = result.single().get( "metaData" ).asMap();
assertEquals( metadata, receivedMetadata );
@@ -99,7 +101,8 @@ void shouldSetTransactionMetadataAsync()
.build();
// call listTransactions procedure that should list itself with the specified metadata
- CompletionStage