diff --git a/driver/src/main/java/org/neo4j/driver/Config.java b/driver/src/main/java/org/neo4j/driver/Config.java
index dafbbdc1bc..382bda8770 100644
--- a/driver/src/main/java/org/neo4j/driver/Config.java
+++ b/driver/src/main/java/org/neo4j/driver/Config.java
@@ -18,19 +18,24 @@
*/
package org.neo4j.driver;
+import org.reactivestreams.Subscription;
+
import java.io.File;
import java.net.InetAddress;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
+import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.RoutingSettings;
+import org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.net.ServerAddressResolver;
+import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.util.Immutable;
import org.neo4j.driver.util.Resource;
@@ -88,6 +93,7 @@ public class Config
private final int routingFailureLimit;
private final long routingRetryDelayMillis;
+ private final long fetchSize;
private long routingTablePurgeDelayMillis;
private final int connectionTimeoutMillis;
@@ -114,6 +120,7 @@ private Config( ConfigBuilder builder )
this.routingTablePurgeDelayMillis = builder.routingTablePurgeDelayMillis;
this.retrySettings = builder.retrySettings;
this.resolver = builder.resolver;
+ this.fetchSize = builder.fetchSize;
this.isMetricsEnabled = builder.isMetricsEnabled;
}
@@ -230,6 +237,11 @@ RetrySettings retrySettings()
return retrySettings;
}
+ public long fetchSize()
+ {
+ return fetchSize;
+ }
+
/**
* @return if the metrics is enabled or not on this driver.
*/
@@ -258,7 +270,7 @@ public static class ConfigBuilder
private RetrySettings retrySettings = RetrySettings.DEFAULT;
private ServerAddressResolver resolver;
private boolean isMetricsEnabled = false;
-
+ private long fetchSize = FetchSizeUtil.DEFAULT_FETCH_SIZE;
private ConfigBuilder() {}
@@ -566,6 +578,26 @@ public ConfigBuilder withRoutingTablePurgeDelay( long delay, TimeUnit unit )
return this;
}
+ /**
+ * Specify how many records to fetch in each batch.
+ * This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
+ *
+ * Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
+ * This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
+ * By default, the value is set to {@code 1000}.
+ * Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
+ *
+ * This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
+ * As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
+ * @param size the default record fetch size when pulling records in batches using Bolt V4.
+ * @return this builder
+ */
+ public ConfigBuilder withFetchSize( long size )
+ {
+ this.fetchSize = FetchSizeUtil.assertValidFetchSize( size );
+ return this;
+ }
+
/**
* Specify socket connection timeout.
*
diff --git a/driver/src/main/java/org/neo4j/driver/SessionConfig.java b/driver/src/main/java/org/neo4j/driver/SessionConfig.java
index bb8866193b..7a164750bc 100644
--- a/driver/src/main/java/org/neo4j/driver/SessionConfig.java
+++ b/driver/src/main/java/org/neo4j/driver/SessionConfig.java
@@ -18,6 +18,8 @@
*/
package org.neo4j.driver;
+import org.reactivestreams.Subscription;
+
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
@@ -26,6 +28,7 @@
import org.neo4j.driver.reactive.RxSession;
import static java.util.Objects.requireNonNull;
+import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.assertValidFetchSize;
/**
* The session configurations used to configure a session.
@@ -37,12 +40,14 @@ public class SessionConfig
private final Iterable bookmarks;
private final AccessMode defaultAccessMode;
private final String database;
+ private final Optional fetchSize;
private SessionConfig( Builder builder )
{
this.bookmarks = builder.bookmarks;
this.defaultAccessMode = builder.defaultAccessMode;
this.database = builder.database;
+ this.fetchSize = builder.fetchSize;
}
/**
@@ -109,6 +114,15 @@ public Optional database()
return Optional.ofNullable( database );
}
+ /**
+ * This value if set, overrides the default fetch size set on {@link Config#fetchSize()}.
+ * @return an optional value of fetch size.
+ */
+ public Optional fetchSize()
+ {
+ return fetchSize;
+ }
+
@Override
public boolean equals( Object o )
{
@@ -121,7 +135,8 @@ public boolean equals( Object o )
return false;
}
SessionConfig that = (SessionConfig) o;
- return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database );
+ return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database )
+ && Objects.equals( fetchSize, that.fetchSize );
}
@Override
@@ -133,7 +148,8 @@ public int hashCode()
@Override
public String toString()
{
- return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' + '}';
+ return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' +
+ ", fetchSize=" + fetchSize + '}';
}
/**
@@ -141,6 +157,7 @@ public String toString()
*/
public static class Builder
{
+ private Optional fetchSize = Optional.empty();
private Iterable bookmarks = null;
private AccessMode defaultAccessMode = AccessMode.WRITE;
private String database = null;
@@ -230,6 +247,27 @@ public Builder withDatabase( String database )
return this;
}
+ /**
+ * Specify how many records to fetch in each batch for this session.
+ * This config will overrides the default value set on {@link Config#fetchSize()}.
+ * This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
+ *
+ * Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
+ * This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
+ * By default, the value is set to {@code 1000}.
+ * Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
+ *
+ * This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
+ * As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
+ * @param size the default record fetch size when pulling records in batches using Bolt V4.
+ * @return this builder
+ */
+ public Builder withFetchSize( long size )
+ {
+ this.fetchSize = Optional.of( assertValidFetchSize( size ) );
+ return this;
+ }
+
public SessionConfig build()
{
return new SessionConfig( this );
diff --git a/driver/src/main/java/org/neo4j/driver/StatementResult.java b/driver/src/main/java/org/neo4j/driver/StatementResult.java
index 912f2d1c0c..1feb142ead 100644
--- a/driver/src/main/java/org/neo4j/driver/StatementResult.java
+++ b/driver/src/main/java/org/neo4j/driver/StatementResult.java
@@ -20,11 +20,11 @@
import java.util.Iterator;
import java.util.List;
+import java.util.function.Function;
import java.util.stream.Stream;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.summary.ResultSummary;
-import java.util.function.Function;
import org.neo4j.driver.util.Resource;
@@ -138,29 +138,12 @@ public interface StatementResult extends Iterator
*/
List list( Function mapFunction );
- /**
- * Consume the entire result, yielding a summary of it.
- *
- * Calling this method exhausts the result.
- *
- *
- * {@code
- * ResultSummary summary = session.run( "PROFILE MATCH (n:User {id: 12345}) RETURN n" ).consume();
- * }
- *
- *
- * @return a summary for the whole query result
- */
- ResultSummary consume();
-
/**
* Return the result summary.
*
- * If the records in the result is not fully consumed, then calling this method will force to pull all remaining
- * records into buffer to yield the summary.
+ * If the records in the result is not fully consumed, then calling this method will exhausts the result.
*
- * If you want to obtain the summary but discard the records, use
- * {@link StatementResult#consume()} instead.
+ * If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
*
* @return a summary for the whole query result.
*/
diff --git a/driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java
index 0fca6067ef..c1a6763503 100644
--- a/driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java
@@ -27,6 +27,7 @@
import org.neo4j.driver.Record;
import org.neo4j.driver.Records;
+import org.neo4j.driver.StatementResult;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.summary.ResultSummary;
@@ -72,10 +73,9 @@ public interface StatementResultCursor
/**
* Asynchronously retrieve the result summary.
*
- * If the records in the result is not fully consumed, then calling this method will force to pull all remaining
- * records into buffer to yield the summary.
+ * If the records in the result is not fully consumed, then calling this method will exhausts the result.
*
- * If you want to obtain the summary but discard the records, use {@link #consumeAsync()} instead.
+ * If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
*
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
* completed exceptionally if query execution fails.
@@ -110,14 +110,6 @@ public interface StatementResultCursor
*/
CompletionStage singleAsync();
- /**
- * Asynchronously consume the entire result, yielding a summary of it. Calling this method exhausts the result.
- *
- * @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
- * completed exceptionally if query execution fails.
- */
- CompletionStage consumeAsync();
-
/**
* Asynchronously apply the given {@link Consumer action} to every record in the result, yielding a summary of it.
*
diff --git a/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java b/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java
index b650a2184d..916572b4ad 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java
@@ -22,5 +22,6 @@
public interface FailableCursor
{
+ CompletionStage consumeAsync();
CompletionStage failureAsync();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
index 786a979cb7..05fa2b8c65 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
@@ -164,10 +164,10 @@ private static RuntimeException driverCloseException()
return new IllegalStateException( "This driver instance has already been closed" );
}
- public NetworkSession newSession( SessionConfig parameters )
+ public NetworkSession newSession( SessionConfig config )
{
assertOpen();
- NetworkSession session = sessionFactory.newInstance( parameters );
+ NetworkSession session = sessionFactory.newInstance( config );
if ( closed.get() )
{
// session does not immediately acquire connection, it is fine to just throw
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
index a836a074cf..bc2532346e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
@@ -22,18 +22,18 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.Record;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.NoSuchRecordException;
+import org.neo4j.driver.internal.spi.Connection;
+import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.summary.ResultSummary;
-import java.util.function.Function;
public class InternalStatementResult implements StatementResult
{
@@ -111,12 +111,6 @@ public List list( Function mapFunction )
return blockingGet( cursor.listAsync( mapFunction ) );
}
- @Override
- public ResultSummary consume()
- {
- return blockingGet( cursor.consumeAsync() );
- }
-
@Override
public ResultSummary summary()
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java
index 3df56fff2c..60c6c72c43 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java
@@ -25,7 +25,7 @@
public interface SessionFactory
{
- NetworkSession newInstance( SessionConfig parameters );
+ NetworkSession newInstance( SessionConfig sessionConfig );
CompletionStage verifyConnectivity();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
index 157b98bbca..46d8d1da04 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
@@ -36,6 +36,7 @@ public class SessionFactoryImpl implements SessionFactory
private final RetryLogic retryLogic;
private final Logging logging;
private final boolean leakedSessionsLoggingEnabled;
+ private final long defaultFetchSize;
SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
{
@@ -43,6 +44,7 @@ public class SessionFactoryImpl implements SessionFactory
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
this.retryLogic = retryLogic;
this.logging = config.logging();
+ this.defaultFetchSize = config.fetchSize();
}
@Override
@@ -50,7 +52,12 @@ public NetworkSession newInstance( SessionConfig sessionConfig )
{
BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( InternalBookmark.from( sessionConfig.bookmarks() ) );
return createSession( connectionProvider, retryLogic, parseDatabaseName( sessionConfig ),
- sessionConfig.defaultAccessMode(), bookmarkHolder, logging );
+ sessionConfig.defaultAccessMode(), bookmarkHolder, parseFetchSize( sessionConfig ), logging );
+ }
+
+ private long parseFetchSize( SessionConfig sessionConfig )
+ {
+ return sessionConfig.fetchSize().orElse( defaultFetchSize );
}
private DatabaseName parseDatabaseName( SessionConfig sessionConfig )
@@ -85,10 +92,10 @@ public ConnectionProvider getConnectionProvider()
}
private NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
- BookmarkHolder bookmarkHolder, Logging logging )
+ BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
{
return leakedSessionsLoggingEnabled
- ? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging )
- : new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging );
+ ? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging )
+ : new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java
index e8d613fa81..a5c9e3d6e6 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java
@@ -29,7 +29,7 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.InternalBookmark;
-import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
+import org.neo4j.driver.internal.cursor.AsyncStatementResultCursor;
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
import org.neo4j.driver.internal.messaging.BoltProtocol;
import org.neo4j.driver.internal.spi.Connection;
@@ -62,15 +62,17 @@ private enum State
private final BoltProtocol protocol;
private final BookmarkHolder bookmarkHolder;
private final ResultCursorsHolder resultCursors;
+ private final long fetchSize;
private volatile State state = State.ACTIVE;
- public ExplicitTransaction( Connection connection, BookmarkHolder bookmarkHolder )
+ public ExplicitTransaction( Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
{
this.connection = connection;
this.protocol = connection.protocol();
this.bookmarkHolder = bookmarkHolder;
this.resultCursors = new ResultCursorsHolder();
+ this.fetchSize = fetchSize;
}
public CompletionStage beginAsync( InternalBookmark initialBookmark, TransactionConfig config )
@@ -139,8 +141,8 @@ else if ( state == State.ROLLED_BACK )
public CompletionStage runAsync( Statement statement, boolean waitForRunResponse )
{
ensureCanRunQueries();
- CompletionStage cursorStage =
- protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse ).asyncResult();
+ CompletionStage cursorStage =
+ protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse, fetchSize ).asyncResult();
resultCursors.add( cursorStage );
return cursorStage.thenApply( cursor -> cursor );
}
@@ -149,7 +151,7 @@ public CompletionStage runRx( Statement statement )
{
ensureCanRunQueries();
CompletionStage cursorStage =
- protocol.runInExplicitTransaction( connection, statement, this, false ).rxResult();
+ protocol.runInExplicitTransaction( connection, statement, this, false, fetchSize ).rxResult();
resultCursors.add( cursorStage );
return cursorStage;
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
index 476e233296..adf4495d2c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
@@ -33,9 +33,9 @@ public class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;
public LeakLoggingNetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
- BookmarkHolder bookmarkHolder, Logging logging )
+ BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
{
- super( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging );
+ super( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging );
this.stackTrace = captureStackTrace();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
index 4d23f2820e..e0c3632ee7 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
@@ -34,7 +34,7 @@
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.InternalBookmark;
-import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
+import org.neo4j.driver.internal.cursor.AsyncStatementResultCursor;
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.logging.PrefixedLogger;
@@ -57,6 +57,7 @@ public class NetworkSession
protected final Logger logger;
private final BookmarkHolder bookmarkHolder;
+ private final long fetchSize;
private volatile CompletionStage transactionStage = completedWithNull();
private volatile CompletionStage connectionStage = completedWithNull();
private volatile CompletionStage extends FailableCursor> resultCursorStage = completedWithNull();
@@ -64,7 +65,7 @@ public class NetworkSession
private final AtomicBoolean open = new AtomicBoolean( true );
public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
- BookmarkHolder bookmarkHolder, Logging logging )
+ BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
@@ -72,11 +73,12 @@ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLo
this.logger = new PrefixedLogger( "[" + hashCode() + "]", logging.getLog( LOG_NAME ) );
this.bookmarkHolder = bookmarkHolder;
this.connectionContext = new NetworkSessionConnectionContext( databaseName, bookmarkHolder.getBookmark() );
+ this.fetchSize = fetchSize;
}
public CompletionStage runAsync( Statement statement, TransactionConfig config, boolean waitForRunResponse )
{
- CompletionStage newResultCursorStage =
+ CompletionStage newResultCursorStage =
buildResultCursorFactory( statement, config, waitForRunResponse ).thenCompose( StatementResultCursorFactory::asyncResult );
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
@@ -106,7 +108,7 @@ public CompletionStage beginTransactionAsync( AccessMode mo
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection ->
{
- ExplicitTransaction tx = new ExplicitTransaction( connection, bookmarkHolder );
+ ExplicitTransaction tx = new ExplicitTransaction( connection, bookmarkHolder, fetchSize );
return tx.beginAsync( bookmarkHolder.getBookmark(), config );
} );
@@ -194,7 +196,7 @@ public CompletionStage closeAsync()
if ( cursor != null )
{
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
- return cursor.failureAsync();
+ return cursor.consumeAsync();
}
// no result cursor exists so no error exists
return completedWithNull();
@@ -231,7 +233,7 @@ private CompletionStage buildResultCursorFactory(
try
{
StatementResultCursorFactory factory = connection.protocol()
- .runInAutoCommitTransaction( connection, statement, bookmarkHolder, config, waitForRunResponse );
+ .runInAutoCommitTransaction( connection, statement, bookmarkHolder, config, waitForRunResponse, fetchSize );
return completedFuture( factory );
}
catch ( Throwable e )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
index 319f576fac..355c16ad25 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
@@ -74,6 +74,6 @@ private static CompletionStage retrieveFailure( CompletionStage ext
{
return cursorStage
.exceptionally( cursor -> null )
- .thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() );
+ .thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java
index b9c8a74d68..5746fabe3b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java
@@ -39,6 +39,7 @@
import static org.neo4j.driver.Values.parameters;
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
+import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
public class RoutingProcedureRunner
{
@@ -86,7 +87,7 @@ BookmarkHolder bookmarkHolder( InternalBookmark ignored )
CompletionStage> runProcedure( Connection connection, Statement procedure, BookmarkHolder bookmarkHolder )
{
return connection.protocol()
- .runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true )
+ .runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
.asyncResult().thenCompose( StatementResultCursor::listAsync );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursor.java
similarity index 89%
rename from driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursor.java
index 4560931daf..eefa25a3ee 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursor.java
@@ -21,6 +21,6 @@
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.async.StatementResultCursor;
-public interface InternalStatementResultCursor extends StatementResultCursor, FailableCursor
+public interface AsyncStatementResultCursor extends StatementResultCursor, FailableCursor
{
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java
similarity index 88%
rename from driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java
index dd82c5f2cd..97ba5cd031 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.cursor;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -24,29 +24,28 @@
import java.util.function.Consumer;
import java.util.function.Function;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.util.Futures;
-import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
-import org.neo4j.driver.Record;
-import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.summary.ResultSummary;
-public class AsyncStatementResultCursor implements InternalStatementResultCursor
+public class AsyncStatementResultCursorImpl implements AsyncStatementResultCursor
{
- private final RunResponseHandler runResponseHandler;
+ private final RunResponseHandler runHandler;
private final PullAllResponseHandler pullAllHandler;
- public AsyncStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
+ public AsyncStatementResultCursorImpl( RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
{
- this.runResponseHandler = runResponseHandler;
+ this.runHandler = runHandler;
this.pullAllHandler = pullAllHandler;
}
@Override
public List keys()
{
- return runResponseHandler.statementKeys();
+ return runHandler.statementKeys();
}
@Override
@@ -91,12 +90,6 @@ public CompletionStage singleAsync()
} );
}
- @Override
- public CompletionStage consumeAsync()
- {
- return pullAllHandler.consumeAsync();
- }
-
@Override
public CompletionStage forEachAsync( Consumer action )
{
@@ -117,12 +110,19 @@ public CompletionStage> listAsync( Function mapFunction )
return pullAllHandler.listAsync( mapFunction );
}
+ @Override
+ public CompletionStage consumeAsync()
+ {
+ return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
+ }
+
@Override
public CompletionStage failureAsync()
{
return pullAllHandler.failureAsync();
}
+
private void internalForEachAsync( Consumer action, CompletableFuture resultFuture )
{
CompletionStage recordFuture = nextAsync();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java
similarity index 81%
rename from driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java
index 6d657afc7e..6752e0a249 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java
@@ -20,22 +20,20 @@
import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.internal.async.AsyncStatementResultCursor;
+import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
-import org.neo4j.driver.exceptions.ClientException;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.neo4j.driver.internal.messaging.request.PullAllMessage.PULL_ALL;
/**
* Used by Bolt V1, V2, V3
*/
-public class AsyncResultCursorOnlyFactory implements StatementResultCursorFactory
+public class AsyncStatementResultCursorOnlyFactory implements StatementResultCursorFactory
{
protected final Connection connection;
protected final Message runMessage;
@@ -43,7 +41,7 @@ public class AsyncResultCursorOnlyFactory implements StatementResultCursorFactor
protected final PullAllResponseHandler pullAllHandler;
private final boolean waitForRunResponse;
- public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler,
+ public AsyncStatementResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler,
PullAllResponseHandler pullHandler, boolean waitForRunResponse )
{
requireNonNull( connection );
@@ -59,19 +57,20 @@ public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage,
this.waitForRunResponse = waitForRunResponse;
}
- public CompletionStage asyncResult()
+ public CompletionStage asyncResult()
{
// only write and flush messages when async result is wanted.
- connection.writeAndFlush( runMessage, runHandler, PULL_ALL, pullAllHandler );
+ connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
+ pullAllHandler.prePopulateRecords();
if ( waitForRunResponse )
{
// wait for response of RUN before proceeding
- return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursor( runHandler, pullAllHandler ) );
+ return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
}
else
{
- return completedFuture( new AsyncStatementResultCursor( runHandler, pullAllHandler ) );
+ return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java
index bf977e2d3d..62bd08daa9 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java
@@ -21,122 +21,20 @@
import org.reactivestreams.Subscription;
import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
-import org.neo4j.driver.internal.FailableCursor;
-import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
import org.neo4j.driver.Record;
+import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.summary.ResultSummary;
-import static org.neo4j.driver.internal.handlers.pulln.AbstractBasicPullResponseHandler.DISCARD_RECORD_CONSUMER;
-
-public class RxStatementResultCursor implements Subscription, FailableCursor
+public interface RxStatementResultCursor extends Subscription, FailableCursor
{
- private final RunResponseHandler runHandler;
- private final BasicPullResponseHandler pullHandler;
- private final Throwable runResponseError;
- private final CompletableFuture summaryFuture = new CompletableFuture<>();
- private boolean isRecordHandlerInstalled = false;
-
- public RxStatementResultCursor( RunResponseHandler runHandler, BasicPullResponseHandler pullHandler )
- {
- this( null, runHandler, pullHandler );
- }
-
- public RxStatementResultCursor( Throwable runError, RunResponseHandler runHandler, BasicPullResponseHandler pullHandler )
- {
- Objects.requireNonNull( runHandler );
- Objects.requireNonNull( pullHandler );
- assertRunResponseArrived( runHandler );
-
- this.runResponseError = runError;
- this.runHandler = runHandler;
- this.pullHandler = pullHandler;
- installSummaryConsumer();
- }
-
- public List keys()
- {
- return runHandler.statementKeys();
- }
-
- public void installRecordConsumer( BiConsumer recordConsumer )
- {
- if ( isRecordHandlerInstalled )
- {
- return;
- }
- isRecordHandlerInstalled = true;
- pullHandler.installRecordConsumer( recordConsumer );
- assertRunCompletedSuccessfully();
- }
-
- public void request( long n )
- {
- pullHandler.request( n );
- }
-
- @Override
- public void cancel()
- {
- pullHandler.cancel();
- }
-
- @Override
- public CompletionStage failureAsync()
- {
- // calling this method will enforce discarding record stream and finish running cypher query
- return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
- }
-
- public CompletionStage summaryAsync()
- {
- if ( !isDone() ) // the summary is called before record streaming
- {
- installRecordConsumer( DISCARD_RECORD_CONSUMER );
- cancel();
- }
-
- return this.summaryFuture;
- }
-
- public boolean isDone()
- {
- return summaryFuture.isDone();
- }
+ List keys();
- private void assertRunCompletedSuccessfully()
- {
- if ( runResponseError != null )
- {
- pullHandler.onFailure( runResponseError );
- }
- }
+ void installRecordConsumer( BiConsumer recordConsumer );
- private void installSummaryConsumer()
- {
- pullHandler.installSummaryConsumer( ( summary, error ) -> {
- if ( error != null )
- {
- summaryFuture.completeExceptionally( error );
- }
- else if ( summary != null )
- {
- summaryFuture.complete( summary );
- }
- //else (null, null) to indicate a has_more success
- } );
- }
+ CompletionStage summaryAsync();
- private void assertRunResponseArrived( RunResponseHandler runHandler )
- {
- if ( !runHandler.runFuture().isDone() )
- {
- throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
- }
- }
+ boolean isDone();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java
new file mode 100644
index 0000000000..689320a7c0
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.cursor;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.BiConsumer;
+
+import org.neo4j.driver.Record;
+import org.neo4j.driver.internal.handlers.RunResponseHandler;
+import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
+import org.neo4j.driver.summary.ResultSummary;
+
+public class RxStatementResultCursorImpl implements RxStatementResultCursor
+{
+ static final BiConsumer DISCARD_RECORD_CONSUMER = ( record, throwable ) -> {/*do nothing*/};
+ private final RunResponseHandler runHandler;
+ private final PullResponseHandler pullHandler;
+ private final Throwable runResponseError;
+ private final CompletableFuture summaryFuture = new CompletableFuture<>();
+ private BiConsumer recordConsumer;
+
+ public RxStatementResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
+ {
+ this( null, runHandler, pullHandler );
+ }
+
+ public RxStatementResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
+ {
+ Objects.requireNonNull( runHandler );
+ Objects.requireNonNull( pullHandler );
+ assertRunResponseArrived( runHandler );
+
+ this.runResponseError = runError;
+ this.runHandler = runHandler;
+ this.pullHandler = pullHandler;
+ installSummaryConsumer();
+ }
+
+ @Override
+ public List keys()
+ {
+ return runHandler.statementKeys();
+ }
+
+ @Override
+ public void installRecordConsumer( BiConsumer recordConsumer )
+ {
+ if ( isRecordConsumerInstalled() )
+ {
+ return;
+ }
+ this.recordConsumer = recordConsumer;
+ pullHandler.installRecordConsumer( this.recordConsumer );
+ assertRunCompletedSuccessfully();
+ }
+
+ private boolean isRecordConsumerInstalled()
+ {
+ return this.recordConsumer != null;
+ }
+
+ @Override
+ public void request( long n )
+ {
+ pullHandler.request( n );
+ }
+
+ @Override
+ public void cancel()
+ {
+ pullHandler.cancel();
+ }
+
+ @Override
+ public CompletionStage consumeAsync()
+ {
+ // calling this method will enforce discarding record stream and finish running cypher query
+ return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
+ }
+
+ @Override
+ public CompletionStage failureAsync()
+ {
+ // It is safe to discard records as either the streaming has not started at all, or the streaming is fully finished.
+ return consumeAsync();
+ }
+
+ @Override
+ public CompletionStage summaryAsync()
+ {
+ if ( !isDone() ) // the summary is called before record streaming
+ {
+ installRecordConsumer( DISCARD_RECORD_CONSUMER );
+ cancel();
+ }
+
+ return this.summaryFuture;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return summaryFuture.isDone();
+ }
+
+ private void assertRunCompletedSuccessfully()
+ {
+ if ( runResponseError != null )
+ {
+ pullHandler.onFailure( runResponseError );
+ }
+ }
+
+ private void installSummaryConsumer()
+ {
+ pullHandler.installSummaryConsumer( ( summary, error ) -> {
+ if ( error != null && recordConsumer == DISCARD_RECORD_CONSUMER )
+ {
+ // We will only report the error to summary if there is no user record consumer installed
+ // When a user record consumer is installed, the error will be reported to record consumer instead.
+ summaryFuture.completeExceptionally( error );
+ }
+ else if ( summary != null )
+ {
+ summaryFuture.complete( summary );
+ }
+ //else (null, null) to indicate a has_more success
+ } );
+ }
+
+ private void assertRunResponseArrived( RunResponseHandler runHandler )
+ {
+ if ( !runHandler.runFuture().isDone() )
+ {
+ throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java
index 200d362925..9c0cf01b96 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactory.java
@@ -22,7 +22,7 @@
public interface StatementResultCursorFactory
{
- CompletionStage asyncResult();
+ CompletionStage asyncResult();
CompletionStage rxResult();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactoryImpl.java
similarity index 75%
rename from driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java
rename to driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactoryImpl.java
index ab4fce646b..5259bc26cb 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactoryImpl.java
@@ -20,28 +20,29 @@
import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.internal.async.AsyncStatementResultCursor;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
+import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
import org.neo4j.driver.internal.messaging.Message;
-import org.neo4j.driver.internal.messaging.request.PullMessage;
import org.neo4j.driver.internal.spi.Connection;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
-public class InternalStatementResultCursorFactory implements StatementResultCursorFactory
+/**
+ * Bolt V4
+ */
+public class StatementResultCursorFactoryImpl implements StatementResultCursorFactory
{
private final RunResponseHandler runHandler;
private final Connection connection;
- private final BasicPullResponseHandler pullHandler;
+ private final PullResponseHandler pullHandler;
private final PullAllResponseHandler pullAllHandler;
private final boolean waitForRunResponse;
private final Message runMessage;
- public InternalStatementResultCursorFactory( Connection connection, Message runMessage, RunResponseHandler runHandler, BasicPullResponseHandler pullHandler,
+ public StatementResultCursorFactoryImpl( Connection connection, Message runMessage, RunResponseHandler runHandler, PullResponseHandler pullHandler,
PullAllResponseHandler pullAllHandler, boolean waitForRunResponse )
{
requireNonNull( connection );
@@ -59,19 +60,20 @@ public InternalStatementResultCursorFactory( Connection connection, Message runM
}
@Override
- public CompletionStage asyncResult()
+ public CompletionStage asyncResult()
{
// only write and flush messages when async result is wanted.
- connection.writeAndFlush( runMessage, runHandler, PullMessage.PULL_ALL, pullAllHandler );
+ connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
+ pullAllHandler.prePopulateRecords();
if ( waitForRunResponse )
{
// wait for response of RUN before proceeding
- return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursor( runHandler, pullAllHandler ) );
+ return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
}
else
{
- return completedFuture( new AsyncStatementResultCursor( runHandler, pullAllHandler ) );
+ return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
}
}
@@ -85,6 +87,6 @@ public CompletionStage rxResult()
private RxStatementResultCursor composeRxCursor( Throwable runError )
{
- return new RxStatementResultCursor( runError, runHandler, pullHandler );
+ return new RxStatementResultCursorImpl( runError, runHandler, pullHandler );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/AbstractPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/LegacyPullAllResponseHandler.java
similarity index 92%
rename from driver/src/main/java/org/neo4j/driver/internal/handlers/AbstractPullAllResponseHandler.java
rename to driver/src/main/java/org/neo4j/driver/internal/handlers/LegacyPullAllResponseHandler.java
index 7ec9c43431..00c04dfb05 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/AbstractPullAllResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/LegacyPullAllResponseHandler.java
@@ -27,14 +27,15 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.Value;
import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.internal.messaging.request.PullAllMessage;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.MetadataExtractor;
-import org.neo4j.driver.Record;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Value;
import org.neo4j.driver.summary.ResultSummary;
import static java.util.Collections.emptyMap;
@@ -43,7 +44,10 @@
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
-public abstract class AbstractPullAllResponseHandler implements PullAllResponseHandler
+/**
+ * This is the Pull All response handler that handles pull all messages in Bolt v3 and previous protocol versions.
+ */
+public class LegacyPullAllResponseHandler implements PullAllResponseHandler
{
private static final Queue UNINITIALIZED_RECORDS = Iterables.emptyQueue();
@@ -54,6 +58,7 @@ public abstract class AbstractPullAllResponseHandler implements PullAllResponseH
private final RunResponseHandler runResponseHandler;
protected final MetadataExtractor metadataExtractor;
protected final Connection connection;
+ private final PullResponseCompletionListener completionListener;
// initialized lazily when first record arrives
private Queue records = UNINITIALIZED_RECORDS;
@@ -67,12 +72,14 @@ public abstract class AbstractPullAllResponseHandler implements PullAllResponseH
private CompletableFuture recordFuture;
private CompletableFuture failureFuture;
- public AbstractPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection, MetadataExtractor metadataExtractor )
+ public LegacyPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection, MetadataExtractor metadataExtractor,
+ PullResponseCompletionListener completionListener )
{
this.statement = requireNonNull( statement );
this.runResponseHandler = requireNonNull( runResponseHandler );
this.metadataExtractor = requireNonNull( metadataExtractor );
this.connection = requireNonNull( connection );
+ this.completionListener = requireNonNull( completionListener );
}
@Override
@@ -87,21 +94,19 @@ public synchronized void onSuccess( Map metadata )
finished = true;
summary = extractResultSummary( metadata );
- afterSuccess( metadata );
+ completionListener.afterSuccess( metadata );
completeRecordFuture( null );
completeFailureFuture( null );
}
- protected abstract void afterSuccess( Map metadata );
-
@Override
public synchronized void onFailure( Throwable error )
{
finished = true;
summary = extractResultSummary( emptyMap() );
- afterFailure( error );
+ completionListener.afterFailure( error );
boolean failedRecordFuture = failRecordFuture( error );
if ( failedRecordFuture )
@@ -120,8 +125,6 @@ public synchronized void onFailure( Throwable error )
}
}
- protected abstract void afterFailure( Throwable error );
-
@Override
public synchronized void onRecord( Value[] fields )
{
@@ -177,6 +180,8 @@ public synchronized CompletionStage nextAsync()
public synchronized CompletionStage summaryAsync()
{
+ ignoreRecords = true;
+ records.clear();
return failureAsync().thenApply( error ->
{
if ( error != null )
@@ -187,13 +192,6 @@ public synchronized CompletionStage summaryAsync()
} );
}
- public synchronized CompletionStage consumeAsync()
- {
- ignoreRecords = true;
- records.clear();
- return summaryAsync();
- }
-
public synchronized CompletionStage> listAsync( Function mapFunction )
{
return failureAsync().thenApply( error ->
@@ -206,6 +204,12 @@ public synchronized CompletionStage> listAsync( Function m
} );
}
+ @Override
+ public void prePopulateRecords()
+ {
+ connection.writeAndFlush( PullAllMessage.PULL_ALL, this );
+ }
+
public synchronized CompletionStage failureAsync()
{
if ( failure != null )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java
index 0dcbd3b384..05466cc5db 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java
@@ -20,11 +20,11 @@
import java.util.List;
import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
-import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.Record;
+import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.summary.ResultSummary;
-import java.util.function.Function;
public interface PullAllResponseHandler extends ResponseHandler
{
@@ -34,9 +34,9 @@ public interface PullAllResponseHandler extends ResponseHandler
CompletionStage peekAsync();
- CompletionStage consumeAsync();
-
CompletionStage> listAsync( Function mapFunction );
CompletionStage failureAsync();
+
+ void prePopulateRecords();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java
index 934449bdde..fbcc22baec 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java
@@ -21,43 +21,51 @@
import org.neo4j.driver.Statement;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.async.ExplicitTransaction;
+import org.neo4j.driver.internal.handlers.pulln.AutoPullResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
-import org.neo4j.driver.internal.handlers.pulln.SessionPullResponseHandler;
-import org.neo4j.driver.internal.handlers.pulln.TransactionPullResponseHandler;
+import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.spi.Connection;
public class PullHandlers
{
- public static AbstractPullAllResponseHandler newBoltV1PullAllHandler( Statement statement, RunResponseHandler runHandler,
+ public static PullAllResponseHandler newBoltV1PullAllHandler( Statement statement, RunResponseHandler runHandler,
Connection connection, ExplicitTransaction tx )
{
- if ( tx != null )
- {
- return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, BoltProtocolV1.METADATA_EXTRACTOR );
- }
- return new SessionPullAllResponseHandler( statement, runHandler, connection, BookmarkHolder.NO_OP, BoltProtocolV1.METADATA_EXTRACTOR );
+ PullResponseCompletionListener completionListener = createPullResponseCompletionListener( connection, BookmarkHolder.NO_OP, tx );
+
+ return new LegacyPullAllResponseHandler( statement, runHandler, connection, BoltProtocolV1.METADATA_EXTRACTOR, completionListener );
}
- public static AbstractPullAllResponseHandler newBoltV3PullAllHandler( Statement statement, RunResponseHandler runHandler, Connection connection,
+ public static PullAllResponseHandler newBoltV3PullAllHandler( Statement statement, RunResponseHandler runHandler, Connection connection,
BookmarkHolder bookmarkHolder, ExplicitTransaction tx )
{
- if ( tx != null )
- {
- return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, BoltProtocolV3.METADATA_EXTRACTOR );
- }
- return new SessionPullAllResponseHandler( statement, runHandler, connection, bookmarkHolder, BoltProtocolV3.METADATA_EXTRACTOR );
+ PullResponseCompletionListener completionListener = createPullResponseCompletionListener( connection, bookmarkHolder, tx );
+
+ return new LegacyPullAllResponseHandler( statement, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener );
+ }
+
+ public static PullAllResponseHandler newBoltV4AutoPullHandler( Statement statement, RunResponseHandler runHandler, Connection connection,
+ BookmarkHolder bookmarkHolder, ExplicitTransaction tx, long fetchSize )
+ {
+ PullResponseCompletionListener completionListener = createPullResponseCompletionListener( connection, bookmarkHolder, tx );
+
+ return new AutoPullResponseHandler( statement, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener, fetchSize );
}
- public static BasicPullResponseHandler newBoltV4PullHandler( Statement statement, RunResponseHandler runHandler, Connection connection,
+
+ public static PullResponseHandler newBoltV4BasicPullHandler( Statement statement, RunResponseHandler runHandler, Connection connection,
BookmarkHolder bookmarkHolder, ExplicitTransaction tx )
{
- if ( tx != null )
- {
- return new TransactionPullResponseHandler( statement, runHandler, connection, tx, BoltProtocolV3.METADATA_EXTRACTOR );
- }
- return new SessionPullResponseHandler( statement, runHandler, connection, bookmarkHolder, BoltProtocolV3.METADATA_EXTRACTOR );
+ PullResponseCompletionListener completionListener = createPullResponseCompletionListener( connection, bookmarkHolder, tx );
+
+ return new BasicPullResponseHandler( statement, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener );
}
+ private static PullResponseCompletionListener createPullResponseCompletionListener( Connection connection, BookmarkHolder bookmarkHolder,
+ ExplicitTransaction tx )
+ {
+ return tx != null ? new TransactionPullResponseCompletionListener( tx ) : new SessionPullResponseCompletionListener( connection, bookmarkHolder );
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullResponseCompletionListener.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullResponseCompletionListener.java
new file mode 100644
index 0000000000..1c3297c92b
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullResponseCompletionListener.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.handlers;
+
+import java.util.Map;
+
+import org.neo4j.driver.Value;
+
+public interface PullResponseCompletionListener
+{
+ void afterSuccess( Map metadata );
+
+ void afterFailure( Throwable error );
+
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java
similarity index 69%
rename from driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java
rename to driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java
index 81a396a074..53a3cd5644 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java
@@ -20,34 +20,33 @@
import java.util.Map;
+import org.neo4j.driver.Value;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.MetadataExtractor;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Value;
import static java.util.Objects.requireNonNull;
-public class SessionPullAllResponseHandler extends AbstractPullAllResponseHandler
+public class SessionPullResponseCompletionListener implements PullResponseCompletionListener
{
private final BookmarkHolder bookmarkHolder;
+ private final Connection connection;
- public SessionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
- Connection connection, BookmarkHolder bookmarkHolder, MetadataExtractor metadataExtractor )
+ public SessionPullResponseCompletionListener( Connection connection, BookmarkHolder bookmarkHolder )
{
- super( statement, runResponseHandler, connection, metadataExtractor );
+ this.connection = requireNonNull( connection );
this.bookmarkHolder = requireNonNull( bookmarkHolder );
}
@Override
- protected void afterSuccess( Map metadata )
+ public void afterSuccess( Map metadata )
{
releaseConnection();
- bookmarkHolder.setBookmark( metadataExtractor.extractBookmarks( metadata ) );
+ bookmarkHolder.setBookmark( MetadataExtractor.extractBookmarks( metadata ) );
}
@Override
- protected void afterFailure( Throwable error )
+ public void afterFailure( Throwable error )
{
releaseConnection();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java
similarity index 68%
rename from driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java
rename to driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java
index c9ee224487..73252c3614 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java
@@ -20,32 +20,27 @@
import java.util.Map;
-import org.neo4j.driver.Statement;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.async.ExplicitTransaction;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.MetadataExtractor;
import static java.util.Objects.requireNonNull;
-public class TransactionPullAllResponseHandler extends AbstractPullAllResponseHandler
+public class TransactionPullResponseCompletionListener implements PullResponseCompletionListener
{
private final ExplicitTransaction tx;
- public TransactionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
- Connection connection, ExplicitTransaction tx, MetadataExtractor metadataExtractor )
+ public TransactionPullResponseCompletionListener( ExplicitTransaction tx )
{
- super( statement, runResponseHandler, connection, metadataExtractor );
this.tx = requireNonNull( tx );
}
@Override
- protected void afterSuccess( Map metadata )
+ public void afterSuccess( Map metadata )
{
}
@Override
- protected void afterFailure( Throwable error )
+ public void afterFailure( Throwable error )
{
// always mark transaction as terminated because every error is "acknowledged" with a RESET message
// so database forgets about the transaction after the first error
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AbstractBasicPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AbstractBasicPullResponseHandler.java
deleted file mode 100644
index 7ee433d2f1..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AbstractBasicPullResponseHandler.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Copyright (c) 2002-2019 "Neo4j,"
- * Neo4j Sweden AB [http://neo4j.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.handlers.pulln;
-
-import java.util.Map;
-import java.util.function.BiConsumer;
-
-import org.neo4j.driver.internal.InternalRecord;
-import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.messaging.request.PullMessage;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.MetadataExtractor;
-import org.neo4j.driver.internal.value.BooleanValue;
-import org.neo4j.driver.Record;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Value;
-import org.neo4j.driver.summary.ResultSummary;
-
-import static java.lang.String.format;
-import static java.util.Collections.emptyMap;
-import static java.util.Objects.requireNonNull;
-import static org.neo4j.driver.internal.messaging.request.DiscardMessage.newDiscardAllMessage;
-
-/**
- * In this class we have a hidden state machine.
- * Here is how it looks like:
- * | | DONE | FAILED | STREAMING | READY | CANCELED |
- * |--------------------|------|--------|--------------------------------|--------------------|----------------|
- * | request | X | X | toRequest++ ->STREAMING | PULL ->STREAMING | X |
- * | cancel | X | X | ->CANCELED | DISCARD ->CANCELED | ->CANCELED |
- * | onSuccess has_more | X | X | ->READY request if toRequest>0 | X | ->READY cancel |
- * | onSuccess | X | X | summary ->DONE | X | summary ->DONE |
- * | onRecord | X | X | yield record ->STREAMING | X | ->CANCELED |
- * | onFailure | X | X | ->FAILED | X | ->FAILED |
- *
- * Currently the error state (marked with X on the table above) might not be enforced.
- */
-public abstract class AbstractBasicPullResponseHandler implements BasicPullResponseHandler
-{
- public static final BiConsumer DISCARD_RECORD_CONSUMER = ( record, throwable ) -> {/*do nothing*/};
-
- private final Statement statement;
- protected final RunResponseHandler runResponseHandler;
- protected final MetadataExtractor metadataExtractor;
- protected final Connection connection;
-
- private Status status = Status.READY;
- private long toRequest;
- private BiConsumer recordConsumer = null;
- private BiConsumer summaryConsumer = null;
-
- protected abstract void afterSuccess( Map metadata );
-
- protected abstract void afterFailure( Throwable error );
-
- public AbstractBasicPullResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection, MetadataExtractor metadataExtractor )
- {
- this.statement = requireNonNull( statement );
- this.runResponseHandler = requireNonNull( runResponseHandler );
- this.metadataExtractor = requireNonNull( metadataExtractor );
- this.connection = requireNonNull( connection );
- }
-
- @Override
- public synchronized void onSuccess( Map metadata )
- {
- assertRecordAndSummaryConsumerInstalled();
- if ( metadata.getOrDefault( "has_more", BooleanValue.FALSE ).asBoolean() )
- {
- handleSuccessWithHasMore();
- }
- else
- {
- handleSuccessWithSummary( metadata );
- }
- }
-
- @Override
- public synchronized void onFailure( Throwable error )
- {
- assertRecordAndSummaryConsumerInstalled();
- status = Status.FAILED;
- afterFailure( error );
-
- complete( extractResultSummary( emptyMap() ), error );
- }
-
- @Override
- public synchronized void onRecord( Value[] fields )
- {
- assertRecordAndSummaryConsumerInstalled();
- if ( isStreaming() )
- {
- Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
- recordConsumer.accept( record, null );
- }
- }
-
- @Override
- public synchronized void request( long size )
- {
- assertRecordAndSummaryConsumerInstalled();
- if ( isStreamingPaused() )
- {
- connection.writeAndFlush( new PullMessage( size, runResponseHandler.statementId() ), this );
- status = Status.STREAMING;
- }
- else if ( isStreaming() )
- {
- addToRequest( size );
- }
- }
-
- @Override
- public synchronized void cancel()
- {
- assertRecordAndSummaryConsumerInstalled();
- if ( isStreamingPaused() )
- {
- // Reactive API does not provide a way to discard N. Only discard all.
- connection.writeAndFlush( newDiscardAllMessage( runResponseHandler.statementId() ), this );
- status = Status.CANCELED;
- }
- else if ( isStreaming() )
- {
- status = Status.CANCELED;
- }
- // no need to change status if it is already done
- }
-
- @Override
- public synchronized void installSummaryConsumer( BiConsumer summaryConsumer )
- {
- if( this.summaryConsumer != null )
- {
- throw new IllegalStateException( "Summary consumer already installed." );
- }
- this.summaryConsumer = summaryConsumer;
- }
-
- @Override
- public synchronized void installRecordConsumer( BiConsumer recordConsumer )
- {
- if( this.recordConsumer != null )
- {
- throw new IllegalStateException( "Record consumer already installed." );
- }
- this.recordConsumer = recordConsumer;
- }
-
- private boolean isStreaming()
- {
- return status == Status.STREAMING;
- }
-
- private boolean isStreamingPaused()
- {
- return status == Status.READY;
- }
-
- private boolean isFinished()
- {
- return status == Status.DONE || status == Status.FAILED;
- }
-
- private void handleSuccessWithSummary( Map metadata )
- {
- status = Status.DONE;
- afterSuccess( metadata );
- ResultSummary summary = extractResultSummary( metadata );
-
- complete( summary, null );
- }
-
- private void handleSuccessWithHasMore()
- {
- if ( this.status == Status.CANCELED )
- {
- this.status = Status.READY; // cancel request accepted.
- cancel();
- }
- else if ( this.status == Status.STREAMING )
- {
- this.status = Status.READY;
- if ( toRequest > 0 )
- {
- request( toRequest );
- toRequest = 0;
- }
- // summary consumer use (null, null) to identify done handling of success with has_more
- summaryConsumer.accept( null, null );
- }
- }
-
- private ResultSummary extractResultSummary( Map metadata )
- {
- long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
- return metadataExtractor.extractSummary( statement, connection, resultAvailableAfter, metadata );
- }
-
- private void addToRequest( long toAdd )
- {
- if ( toAdd <= 0 )
- {
- throw new IllegalArgumentException( "Cannot request record amount that is less than or equal to 0. Request amount: " + toAdd );
- }
- toRequest += toAdd;
- if ( toRequest <= 0 ) // toAdd is already at least 1, we hit buffer overflow
- {
- toRequest = Long.MAX_VALUE;
- }
- }
-
- private void assertRecordAndSummaryConsumerInstalled()
- {
- if( isFinished() )
- {
- // no need to check if we've finished.
- return;
- }
- if( recordConsumer == null || summaryConsumer == null )
- {
- throw new IllegalStateException( format("Access record stream without record consumer and/or summary consumer. " +
- "Record consumer=%s, Summary consumer=%s", recordConsumer, summaryConsumer) );
- }
- }
-
- private void complete( ResultSummary summary, Throwable error )
- {
- // we first inform the summary consumer to ensure when streaming finished, summary is definitely available.
- if ( recordConsumer == DISCARD_RECORD_CONSUMER )
- {
- // we will report the error to summary if there is no record consumer
- summaryConsumer.accept( summary, error );
- }
- else
- {
- // we will not inform the error to summary as the error will be reported to record consumer
- summaryConsumer.accept( summary, null );
- }
-
- // record consumer use (null, null) to identify the end of record stream
- recordConsumer.accept( null, error );
- dispose();
- }
-
- private void dispose()
- {
- // release the reference to the consumers who hold the reference to subscribers which shall be released when subscription is completed.
- this.recordConsumer = null;
- this.summaryConsumer = null;
- }
-
- protected Status status()
- {
- return this.status;
- }
-
- protected void status( Status status )
- {
- this.status = status;
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java
new file mode 100644
index 0000000000..9aaea3009f
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java
@@ -0,0 +1,297 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.handlers.pulln;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
+
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
+import org.neo4j.driver.internal.handlers.PullResponseCompletionListener;
+import org.neo4j.driver.internal.handlers.RunResponseHandler;
+import org.neo4j.driver.internal.spi.Connection;
+import org.neo4j.driver.internal.util.Iterables;
+import org.neo4j.driver.internal.util.MetadataExtractor;
+import org.neo4j.driver.summary.ResultSummary;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
+import static org.neo4j.driver.internal.util.Futures.completedWithNull;
+import static org.neo4j.driver.internal.util.Futures.failedFuture;
+
+/**
+ * Built on top of {@link BasicPullResponseHandler} to be able to pull in batches.
+ * It is exposed as {@link PullAllResponseHandler} as it can automatically pull when running out of records locally.
+ */
+public class AutoPullResponseHandler extends BasicPullResponseHandler implements PullAllResponseHandler
+{
+ private static final Queue UNINITIALIZED_RECORDS = Iterables.emptyQueue();
+ private final long fetchSize;
+
+ // initialized lazily when first record arrives
+ private Queue records = UNINITIALIZED_RECORDS;
+
+ private ResultSummary summary;
+ private Throwable failure;
+
+ private CompletableFuture recordFuture;
+ private CompletableFuture summaryFuture;
+
+ public AutoPullResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection, MetadataExtractor metadataExtractor,
+ PullResponseCompletionListener completionListener, long fetchSize )
+ {
+ super( statement, runResponseHandler, connection, metadataExtractor, completionListener );
+ this.fetchSize = fetchSize;
+ installRecordAndSummaryConsumers();
+ }
+
+ private void installRecordAndSummaryConsumers()
+ {
+ installRecordConsumer( ( record, error ) -> {
+ if ( record != null )
+ {
+ enqueueRecord( record );
+ completeRecordFuture( record );
+ }
+ // if ( error != null ) Handled by summary.error already
+ if ( record == null && error == null )
+ {
+ // complete
+ completeRecordFuture( null );
+ }
+ } );
+
+ installSummaryConsumer( ( summary, error ) -> {
+ if ( error != null )
+ {
+ handleFailure( error );
+ }
+ if ( summary != null )
+ {
+ this.summary = summary;
+ completeSummaryFuture( summary );
+ }
+
+ if ( error == null && summary == null ) // has_more
+ {
+ request( fetchSize );
+ }
+ } );
+ }
+
+ private void handleFailure( Throwable error )
+ {
+ // error has not been propagated to the user, remember it
+ if ( !failRecordFuture( error ) && !failSummaryFuture( error ) )
+ {
+ failure = error;
+ }
+ }
+
+ public synchronized CompletionStage peekAsync()
+ {
+ Record record = records.peek();
+ if ( record == null )
+ {
+ if ( isDone() )
+ {
+ return completedWithValueIfNoFailure( null );
+ }
+
+ if ( recordFuture == null )
+ {
+ recordFuture = new CompletableFuture<>();
+ }
+ return recordFuture;
+ }
+ else
+ {
+ return completedFuture( record );
+ }
+ }
+
+ public synchronized CompletionStage nextAsync()
+ {
+ return peekAsync().thenApply( ignore -> dequeueRecord() );
+ }
+
+ public synchronized CompletionStage summaryAsync()
+ {
+ if ( isDone() )
+ {
+ records.clear();
+ return completedWithValueIfNoFailure( summary );
+ }
+ else
+ {
+ cancel();
+ records.clear();
+ if ( summaryFuture == null )
+ {
+ summaryFuture = new CompletableFuture<>();
+ }
+
+ return summaryFuture;
+ }
+ }
+
+ public synchronized CompletionStage> listAsync( Function mapFunction )
+ {
+ return pullAllAsync().thenApply( summary -> recordsAsList( mapFunction ) );
+ }
+
+ @Override
+ public synchronized CompletionStage failureAsync()
+ {
+ return pullAllAsync().handle( ( ignore, error ) -> error );
+ }
+
+ @Override
+ public void prePopulateRecords()
+ {
+ request( fetchSize );
+ }
+
+ private synchronized CompletionStage pullAllAsync()
+ {
+ if ( isDone() )
+ {
+ return completedWithValueIfNoFailure( summary );
+ }
+ else
+ {
+ request( UNLIMITED_FETCH_SIZE );
+ if ( summaryFuture == null )
+ {
+ summaryFuture = new CompletableFuture<>();
+ }
+
+ return summaryFuture;
+ }
+ }
+
+ private void enqueueRecord( Record record )
+ {
+ if ( records == UNINITIALIZED_RECORDS )
+ {
+ records = new ArrayDeque<>();
+ }
+
+ records.add( record );
+ }
+
+ private Record dequeueRecord()
+ {
+ return records.poll();
+ }
+
+ private List recordsAsList( Function mapFunction )
+ {
+ if ( !isDone() )
+ {
+ throw new IllegalStateException( "Can't get records as list because SUCCESS or FAILURE did not arrive" );
+ }
+
+ List result = new ArrayList<>( records.size() );
+ while ( !records.isEmpty() )
+ {
+ Record record = records.poll();
+ result.add( mapFunction.apply( record ) );
+ }
+ return result;
+ }
+
+ private Throwable extractFailure()
+ {
+ if ( failure == null )
+ {
+ throw new IllegalStateException( "Can't extract failure because it does not exist" );
+ }
+
+ Throwable error = failure;
+ failure = null; // propagate failure only once
+ return error;
+ }
+
+ private void completeRecordFuture( Record record )
+ {
+ if ( recordFuture != null )
+ {
+ CompletableFuture future = recordFuture;
+ recordFuture = null;
+ future.complete( record );
+ }
+ }
+
+ private void completeSummaryFuture( ResultSummary summary )
+ {
+ if ( summaryFuture != null )
+ {
+ CompletableFuture future = summaryFuture;
+ summaryFuture = null;
+ future.complete( summary );
+ }
+ }
+
+ private boolean failRecordFuture( Throwable error )
+ {
+ if ( recordFuture != null )
+ {
+ CompletableFuture future = recordFuture;
+ recordFuture = null;
+ future.completeExceptionally( error );
+ return true;
+ }
+ return false;
+ }
+
+ private boolean failSummaryFuture( Throwable error )
+ {
+ if ( summaryFuture != null )
+ {
+ CompletableFuture future = summaryFuture;
+ summaryFuture = null;
+ future.completeExceptionally( error );
+ return true;
+ }
+ return false;
+ }
+
+ private CompletionStage completedWithValueIfNoFailure( T value )
+ {
+ if ( failure != null )
+ {
+ return failedFuture( extractFailure() );
+ }
+ else if ( value == null )
+ {
+ return completedWithNull();
+ }
+ else
+ {
+ return completedFuture( value );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java
index f0bc3b700b..29eaee281a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java
@@ -18,38 +18,262 @@
*/
package org.neo4j.driver.internal.handlers.pulln;
-import org.reactivestreams.Subscription;
-
+import java.util.Map;
import java.util.function.BiConsumer;
-import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.Record;
+import org.neo4j.driver.Statement;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.internal.handlers.PullResponseCompletionListener;
+import org.neo4j.driver.internal.handlers.RunResponseHandler;
+import org.neo4j.driver.internal.messaging.request.PullMessage;
+import org.neo4j.driver.internal.spi.Connection;
+import org.neo4j.driver.internal.util.MetadataExtractor;
+import org.neo4j.driver.internal.value.BooleanValue;
import org.neo4j.driver.summary.ResultSummary;
-public interface BasicPullResponseHandler extends ResponseHandler, Subscription
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.requireNonNull;
+import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
+import static org.neo4j.driver.internal.messaging.request.DiscardMessage.newDiscardAllMessage;
+
+/**
+ * In this class we have a hidden state machine.
+ * Here is how it looks like:
+ * | | DONE | FAILED | STREAMING | READY | CANCELED |
+ * |--------------------|------|--------|--------------------------------|--------------------|----------------|
+ * | request | X | X | toRequest++ ->STREAMING | PULL ->STREAMING | X |
+ * | cancel | X | X | ->CANCELED | DISCARD ->CANCELED | ->CANCELED |
+ * | onSuccess has_more | X | X | ->READY request if toRequest>0 | X | ->READY cancel |
+ * | onSuccess | X | X | summary ->DONE | X | summary ->DONE |
+ * | onRecord | X | X | yield record ->STREAMING | X | ->CANCELED |
+ * | onFailure | X | X | ->FAILED | X | ->FAILED |
+ *
+ * Currently the error state (marked with X on the table above) might not be enforced.
+ */
+public class BasicPullResponseHandler implements PullResponseHandler
{
- /**
- * Register a record consumer for each record received.
- * STREAMING shall not be started before this consumer is registered.
- * A null record with no error indicates the end of streaming.
- * @param recordConsumer register a record consumer to be notified for each record received.
- */
- void installRecordConsumer( BiConsumer recordConsumer );
-
- /**
- * Register a summary consumer to be notified when a summary is received.
- * STREAMING shall not be started before this consumer is registered.
- * A null summary with no error indicates a SUCCESS message with has_more=true has arrived.
- * @param summaryConsumer register a summary consumer
- */
- void installSummaryConsumer( BiConsumer summaryConsumer );
-
- enum Status
- {
- DONE, // successfully completed
- FAILED, // failed
- CANCELED, // canceled
- STREAMING, // streaming records
- READY // steaming is paused. ready to accept request or cancel commands from user
+ private final Statement statement;
+ protected final RunResponseHandler runResponseHandler;
+ protected final MetadataExtractor metadataExtractor;
+ protected final Connection connection;
+ private final PullResponseCompletionListener completionListener;
+
+ private Status status = Status.READY;
+ private long toRequest;
+ private BiConsumer recordConsumer = null;
+ private BiConsumer summaryConsumer = null;
+
+ public BasicPullResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection, MetadataExtractor metadataExtractor,
+ PullResponseCompletionListener completionListener )
+ {
+ this.statement = requireNonNull( statement );
+ this.runResponseHandler = requireNonNull( runResponseHandler );
+ this.metadataExtractor = requireNonNull( metadataExtractor );
+ this.connection = requireNonNull( connection );
+ this.completionListener = requireNonNull( completionListener );
+ }
+
+ @Override
+ public synchronized void onSuccess( Map metadata )
+ {
+ assertRecordAndSummaryConsumerInstalled();
+ if ( metadata.getOrDefault( "has_more", BooleanValue.FALSE ).asBoolean() )
+ {
+ handleSuccessWithHasMore();
+ }
+ else
+ {
+ handleSuccessWithSummary( metadata );
+ }
+ }
+
+ @Override
+ public synchronized void onFailure( Throwable error )
+ {
+ assertRecordAndSummaryConsumerInstalled();
+ status = Status.FAILED;
+ completionListener.afterFailure( error );
+
+ complete( extractResultSummary( emptyMap() ), error );
+ }
+
+ @Override
+ public synchronized void onRecord( Value[] fields )
+ {
+ assertRecordAndSummaryConsumerInstalled();
+ if ( isStreaming() )
+ {
+ Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
+ recordConsumer.accept( record, null );
+ }
+ }
+
+ @Override
+ public synchronized void request( long size )
+ {
+ assertRecordAndSummaryConsumerInstalled();
+ if ( isStreamingPaused() )
+ {
+ status = Status.STREAMING;
+ connection.writeAndFlush( new PullMessage( size, runResponseHandler.statementId() ), this );
+ }
+ else if ( isStreaming() )
+ {
+ addToRequest( size );
+ }
+ }
+
+ @Override
+ public synchronized void cancel()
+ {
+ assertRecordAndSummaryConsumerInstalled();
+ if ( isStreamingPaused() )
+ {
+ status = Status.CANCELED;
+ // Reactive API does not provide a way to discard N. Only discard all.
+ connection.writeAndFlush( newDiscardAllMessage( runResponseHandler.statementId() ), this );
+ }
+ else if ( isStreaming() )
+ {
+ status = Status.CANCELED;
+ }
+ // no need to change status if it is already done
+ }
+
+ @Override
+ public synchronized void installSummaryConsumer( BiConsumer summaryConsumer )
+ {
+ if( this.summaryConsumer != null )
+ {
+ throw new IllegalStateException( "Summary consumer already installed." );
+ }
+ this.summaryConsumer = summaryConsumer;
+ }
+
+ @Override
+ public synchronized void installRecordConsumer( BiConsumer recordConsumer )
+ {
+ if( this.recordConsumer != null )
+ {
+ throw new IllegalStateException( "Record consumer already installed." );
+ }
+ this.recordConsumer = recordConsumer;
+ }
+
+ private boolean isStreaming()
+ {
+ return status == Status.STREAMING;
+ }
+
+ private boolean isStreamingPaused()
+ {
+ return status == Status.READY;
+ }
+
+ protected boolean isDone()
+ {
+ return status == Status.SUCCEEDED || status == Status.FAILED;
+ }
+
+ private void handleSuccessWithSummary( Map metadata )
+ {
+ status = Status.SUCCEEDED;
+ completionListener.afterSuccess( metadata );
+ ResultSummary summary = extractResultSummary( metadata );
+
+ complete( summary, null );
+ }
+
+ private void handleSuccessWithHasMore()
+ {
+ if ( this.status == Status.CANCELED )
+ {
+ this.status = Status.READY; // cancel request accepted.
+ cancel();
+ }
+ else if ( this.status == Status.STREAMING )
+ {
+ this.status = Status.READY;
+ if ( toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE )
+ {
+ request( toRequest );
+ toRequest = 0;
+ }
+ // summary consumer use (null, null) to identify done handling of success with has_more
+ summaryConsumer.accept( null, null );
+ }
+ }
+
+ private ResultSummary extractResultSummary( Map metadata )
+ {
+ long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
+ return metadataExtractor.extractSummary( statement, connection, resultAvailableAfter, metadata );
+ }
+
+ private void addToRequest( long toAdd )
+ {
+ if ( toRequest == UNLIMITED_FETCH_SIZE )
+ {
+ return;
+ }
+ if ( toAdd == UNLIMITED_FETCH_SIZE )
+ {
+ // pull all
+ toRequest = UNLIMITED_FETCH_SIZE;
+ return;
+ }
+
+ if ( toAdd <= 0 )
+ {
+ throw new IllegalArgumentException( "Cannot request record amount that is less than or equal to 0. Request amount: " + toAdd );
+ }
+ toRequest += toAdd;
+ if ( toRequest <= 0 ) // toAdd is already at least 1, we hit buffer overflow
+ {
+ toRequest = Long.MAX_VALUE;
+ }
+ }
+
+ private void assertRecordAndSummaryConsumerInstalled()
+ {
+ if( isDone() )
+ {
+ // no need to check if we've finished.
+ return;
+ }
+ if( recordConsumer == null || summaryConsumer == null )
+ {
+ throw new IllegalStateException( format("Access record stream without record consumer and/or summary consumer. " +
+ "Record consumer=%s, Summary consumer=%s", recordConsumer, summaryConsumer) );
+ }
+ }
+
+ private void complete( ResultSummary summary, Throwable error )
+ {
+ // we first inform the summary consumer to ensure when streaming finished, summary is definitely available.
+ summaryConsumer.accept( summary, error );
+ // record consumer use (null, null) to identify the end of record stream
+ recordConsumer.accept( null, error );
+ dispose();
+ }
+
+ private void dispose()
+ {
+ // release the reference to the consumers who hold the reference to subscribers which shall be released when subscription is completed.
+ this.recordConsumer = null;
+ this.summaryConsumer = null;
+ }
+
+ protected Status status()
+ {
+ return this.status;
+ }
+
+ protected void status( Status status )
+ {
+ this.status = status;
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/FetchSizeUtil.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/FetchSizeUtil.java
new file mode 100644
index 0000000000..c8d669cc77
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/FetchSizeUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.handlers.pulln;
+
+public class FetchSizeUtil
+{
+ public static final long UNLIMITED_FETCH_SIZE = -1;
+ public static final long DEFAULT_FETCH_SIZE = 1000;
+
+ public static long assertValidFetchSize( long size )
+ {
+ if ( size <= 0 && size != UNLIMITED_FETCH_SIZE )
+ {
+ throw new IllegalArgumentException( String.format( "The record fetch size may not be 0 or negative. Illegal record fetch size: %s.", size ) );
+ }
+ return size;
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java
new file mode 100644
index 0000000000..5ca4130ab8
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2002-2019 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.handlers.pulln;
+
+import org.reactivestreams.Subscription;
+
+import java.util.function.BiConsumer;
+
+import org.neo4j.driver.internal.spi.ResponseHandler;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.summary.ResultSummary;
+
+public interface PullResponseHandler extends ResponseHandler, Subscription
+{
+ /**
+ * Register a record consumer for each record received.
+ * STREAMING shall not be started before this consumer is registered.
+ * A null record with no error indicates the end of streaming.
+ * @param recordConsumer register a record consumer to be notified for each record received.
+ */
+ void installRecordConsumer( BiConsumer recordConsumer );
+
+ /**
+ * Register a summary consumer to be notified when a summary is received.
+ * STREAMING shall not be started before this consumer is registered.
+ * A null summary with no error indicates a SUCCESS message with has_more=true has arrived.
+ * @param summaryConsumer register a summary consumer
+ */
+ void installSummaryConsumer( BiConsumer summaryConsumer );
+
+ enum Status
+ {
+ SUCCEEDED, // successfully completed
+ FAILED, // failed
+ CANCELED, // canceled
+ STREAMING, // streaming records
+ READY // steaming is paused. ready to accept request or cancel commands from user
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseHandler.java
deleted file mode 100644
index 5f2cebed4f..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2002-2019 "Neo4j,"
- * Neo4j Sweden AB [http://neo4j.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.handlers.pulln;
-
-import java.util.Map;
-
-import org.neo4j.driver.internal.BookmarkHolder;
-import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.MetadataExtractor;
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Value;
-
-import static java.util.Objects.requireNonNull;
-
-public class SessionPullResponseHandler extends AbstractBasicPullResponseHandler
-{
- private final BookmarkHolder bookmarkHolder;
-
- public SessionPullResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
- Connection connection, BookmarkHolder bookmarkHolder, MetadataExtractor metadataExtractor )
- {
- super( statement, runResponseHandler, connection, metadataExtractor );
- this.bookmarkHolder = requireNonNull( bookmarkHolder );
- }
-
- @Override
- protected void afterSuccess( Map metadata )
- {
- releaseConnection();
- bookmarkHolder.setBookmark( metadataExtractor.extractBookmarks( metadata ) );
- }
-
- @Override
- protected void afterFailure( Throwable error )
- {
- releaseConnection();
- }
-
- private void releaseConnection()
- {
- connection.release(); // release in background
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java
deleted file mode 100644
index 5944d850cb..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2002-2019 "Neo4j,"
- * Neo4j Sweden AB [http://neo4j.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.handlers.pulln;
-
-import java.util.Map;
-
-import org.neo4j.driver.Statement;
-import org.neo4j.driver.Value;
-import org.neo4j.driver.internal.async.ExplicitTransaction;
-import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.util.MetadataExtractor;
-
-import static java.util.Objects.requireNonNull;
-
-public class TransactionPullResponseHandler extends AbstractBasicPullResponseHandler
-{
- private final ExplicitTransaction tx;
-
- public TransactionPullResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
- Connection connection, ExplicitTransaction tx, MetadataExtractor metadataExtractor )
- {
- super( statement, runResponseHandler, connection, metadataExtractor );
- this.tx = requireNonNull( tx );
- }
-
- @Override
- protected void afterSuccess( Map metadata )
- {
- }
-
- @Override
- protected void afterFailure( Throwable error )
- {
- // always mark transaction as terminated because every error is "acknowledged" with a RESET message
- // so database forgets about the transaction after the first error
- // such transaction should not attempt to commit and can be considered as rolled back
- tx.markTerminated();
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java
index 5255afc90b..13363fafd5 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java
@@ -102,10 +102,11 @@ public interface BoltProtocol
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
* keys populated.
+ * @param fetchSize the record fetch size for PULL message.
* @return stage with cursor.
*/
- StatementResultCursorFactory runInAutoCommitTransaction( Connection connection, Statement statement,
- BookmarkHolder bookmarkHolder, TransactionConfig config, boolean waitForRunResponse );
+ StatementResultCursorFactory runInAutoCommitTransaction( Connection connection, Statement statement, BookmarkHolder bookmarkHolder,
+ TransactionConfig config, boolean waitForRunResponse, long fetchSize );
/**
* Execute the given statement in a running explicit transaction, i.e. {@link Transaction#run(Statement)}.
@@ -116,10 +117,11 @@ StatementResultCursorFactory runInAutoCommitTransaction( Connection connection,
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
* keys populated.
+ * @param fetchSize the record fetch size for PULL message.
* @return stage with cursor.
*/
- StatementResultCursorFactory runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx,
- boolean waitForRunResponse );
+ StatementResultCursorFactory runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx, boolean waitForRunResponse,
+ long fetchSize );
/**
* Returns the protocol version. It can be used for version specific error messages.
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java
index 4be49d145c..8fedd49812 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java
@@ -34,13 +34,13 @@
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.async.ExplicitTransaction;
-import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
+import org.neo4j.driver.internal.cursor.AsyncStatementResultCursorOnlyFactory;
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
-import org.neo4j.driver.internal.handlers.AbstractPullAllResponseHandler;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
import org.neo4j.driver.internal.handlers.InitResponseHandler;
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
+import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.PullHandlers;
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
@@ -159,8 +159,8 @@ public CompletionStage rollbackTransaction( Connection connection )
}
@Override
- public StatementResultCursorFactory runInAutoCommitTransaction( Connection connection, Statement statement,
- BookmarkHolder bookmarkHolder, TransactionConfig config, boolean waitForRunResponse )
+ public StatementResultCursorFactory runInAutoCommitTransaction( Connection connection, Statement statement, BookmarkHolder bookmarkHolder,
+ TransactionConfig config, boolean waitForRunResponse, long ignored )
{
// bookmarks are ignored for auto-commit transactions in this version of the protocol
verifyBeforeTransaction( config, connection.databaseName() );
@@ -169,7 +169,7 @@ public StatementResultCursorFactory runInAutoCommitTransaction( Connection conne
@Override
public StatementResultCursorFactory runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx,
- boolean waitForRunResponse )
+ boolean waitForRunResponse, long ignored )
{
return buildResultCursorFactory( connection, statement, tx, waitForRunResponse );
}
@@ -188,9 +188,9 @@ private static StatementResultCursorFactory buildResultCursorFactory( Connection
RunMessage runMessage = new RunMessage( query, params );
RunResponseHandler runHandler = new RunResponseHandler( METADATA_EXTRACTOR );
- AbstractPullAllResponseHandler pullAllHandler = PullHandlers.newBoltV1PullAllHandler( statement, runHandler, connection, tx );
+ PullAllResponseHandler pullAllHandler = PullHandlers.newBoltV1PullAllHandler( statement, runHandler, connection, tx );
- return new AsyncResultCursorOnlyFactory( connection, runMessage, runHandler, pullAllHandler, waitForRunResponse );
+ return new AsyncStatementResultCursorOnlyFactory( connection, runMessage, runHandler, pullAllHandler, waitForRunResponse );
}
private void verifyBeforeTransaction( TransactionConfig config, DatabaseName databaseName )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java
index eb3b7d6039..d663bbbb4c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java
@@ -32,13 +32,13 @@
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.async.ExplicitTransaction;
-import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
+import org.neo4j.driver.internal.cursor.AsyncStatementResultCursorOnlyFactory;
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
-import org.neo4j.driver.internal.handlers.AbstractPullAllResponseHandler;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
import org.neo4j.driver.internal.handlers.HelloResponseHandler;
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
+import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -137,30 +137,30 @@ public CompletionStage rollbackTransaction( Connection connection )
}
@Override
- public StatementResultCursorFactory runInAutoCommitTransaction( Connection connection, Statement statement,
- BookmarkHolder bookmarkHolder, TransactionConfig config, boolean waitForRunResponse )
+ public StatementResultCursorFactory runInAutoCommitTransaction( Connection connection, Statement statement, BookmarkHolder bookmarkHolder,
+ TransactionConfig config, boolean waitForRunResponse, long fetchSize )
{
verifyDatabaseNameBeforeTransaction( connection.databaseName() );
RunWithMetadataMessage runMessage =
autoCommitTxRunMessage( statement, config, connection.databaseName(), connection.mode(), bookmarkHolder.getBookmark() );
- return buildResultCursorFactory( connection, statement, bookmarkHolder, null, runMessage, waitForRunResponse );
+ return buildResultCursorFactory( connection, statement, bookmarkHolder, null, runMessage, waitForRunResponse, fetchSize );
}
@Override
public StatementResultCursorFactory runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx,
- boolean waitForRunResponse )
+ boolean waitForRunResponse, long fetchSize )
{
RunWithMetadataMessage runMessage = explicitTxRunMessage( statement );
- return buildResultCursorFactory( connection, statement, BookmarkHolder.NO_OP, tx, runMessage, waitForRunResponse );
+ return buildResultCursorFactory( connection, statement, BookmarkHolder.NO_OP, tx, runMessage, waitForRunResponse, fetchSize );
}
protected StatementResultCursorFactory buildResultCursorFactory( Connection connection, Statement statement, BookmarkHolder bookmarkHolder,
- ExplicitTransaction tx, RunWithMetadataMessage runMessage, boolean waitForRunResponse )
+ ExplicitTransaction tx, RunWithMetadataMessage runMessage, boolean waitForRunResponse, long ignored )
{
RunResponseHandler runHandler = new RunResponseHandler( METADATA_EXTRACTOR );
- AbstractPullAllResponseHandler pullHandler = newBoltV3PullAllHandler( statement, runHandler, connection, bookmarkHolder, tx );
+ PullAllResponseHandler pullHandler = newBoltV3PullAllHandler( statement, runHandler, connection, bookmarkHolder, tx );
- return new AsyncResultCursorOnlyFactory( connection, runMessage, runHandler, pullHandler, waitForRunResponse );
+ return new AsyncStatementResultCursorOnlyFactory( connection, runMessage, runHandler, pullHandler, waitForRunResponse );
}
protected void verifyDatabaseNameBeforeTransaction( DatabaseName databaseName )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java
index 21d5927fa0..b00134c460 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java
@@ -22,19 +22,19 @@
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.async.ExplicitTransaction;
-import org.neo4j.driver.internal.cursor.InternalStatementResultCursorFactory;
+import org.neo4j.driver.internal.cursor.StatementResultCursorFactoryImpl;
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
-import org.neo4j.driver.internal.handlers.AbstractPullAllResponseHandler;
+import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
+import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
import org.neo4j.driver.internal.messaging.BoltProtocol;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.spi.Connection;
-import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV3PullAllHandler;
-import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV4PullHandler;
+import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV4AutoPullHandler;
+import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV4BasicPullHandler;
public class BoltProtocolV4 extends BoltProtocolV3
{
@@ -49,14 +49,14 @@ public MessageFormat createMessageFormat()
@Override
protected StatementResultCursorFactory buildResultCursorFactory( Connection connection, Statement statement, BookmarkHolder bookmarkHolder,
- ExplicitTransaction tx, RunWithMetadataMessage runMessage, boolean waitForRunResponse )
+ ExplicitTransaction tx, RunWithMetadataMessage runMessage, boolean waitForRunResponse, long fetchSize )
{
RunResponseHandler runHandler = new RunResponseHandler( METADATA_EXTRACTOR );
- AbstractPullAllResponseHandler pullAllHandler = newBoltV3PullAllHandler( statement, runHandler, connection, bookmarkHolder, tx );
- BasicPullResponseHandler pullHandler = newBoltV4PullHandler( statement, runHandler, connection, bookmarkHolder, tx );
+ PullAllResponseHandler pullAllHandler = newBoltV4AutoPullHandler( statement, runHandler, connection, bookmarkHolder, tx, fetchSize );
+ PullResponseHandler pullHandler = newBoltV4BasicPullHandler( statement, runHandler, connection, bookmarkHolder, tx );
- return new InternalStatementResultCursorFactory( connection, runMessage, runHandler, pullHandler, pullAllHandler, waitForRunResponse );
+ return new StatementResultCursorFactoryImpl( connection, runMessage, runHandler, pullHandler, pullAllHandler, waitForRunResponse );
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java b/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java
index 1cbe3e0f42..381c0f18cd 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/util/ServerVersion.java
@@ -64,7 +64,7 @@ public static ServerVersion version( Driver driver )
{
try ( Session session = driver.session() )
{
- String versionString = session.readTransaction( tx -> tx.run( "RETURN 1" ).consume().server().version() );
+ String versionString = session.readTransaction( tx -> tx.run( "RETURN 1" ).summary().server().version() );
return version( versionString );
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/ConfigTest.java b/driver/src/test/java/org/neo4j/driver/ConfigTest.java
index 1847ab7008..395c0d5429 100644
--- a/driver/src/test/java/org/neo4j/driver/ConfigTest.java
+++ b/driver/src/test/java/org/neo4j/driver/ConfigTest.java
@@ -19,18 +19,23 @@
package org.neo4j.driver;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.neo4j.driver.net.ServerAddressResolver;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.DEFAULT_FETCH_SIZE;
class ConfigTest
{
@@ -273,4 +278,26 @@ void shouldNotAllowNullResolver()
{
assertThrows( NullPointerException.class, () -> Config.builder().withResolver( null ) );
}
+
+ @Test
+ void shouldDefaultToDefaultFetchSize() throws Throwable
+ {
+ Config config = Config.defaultConfig();
+ assertEquals( DEFAULT_FETCH_SIZE, config.fetchSize() );
+ }
+
+ @ParameterizedTest
+ @ValueSource( longs = {100, 1, 1000, Long.MAX_VALUE, -1} )
+ void shouldChangeFetchSize( long value ) throws Throwable
+ {
+ Config config = Config.builder().withFetchSize( value ).build();
+ assertThat( config.fetchSize(), equalTo( value ) );
+ }
+
+ @ParameterizedTest
+ @ValueSource( longs = {0, -100, -2} )
+ void shouldErrorWithIllegalFetchSize( long value ) throws Throwable
+ {
+ assertThrows( IllegalArgumentException.class, () -> Config.builder().withFetchSize( value ).build() );
+ }
}
diff --git a/driver/src/test/java/org/neo4j/driver/ParametersTest.java b/driver/src/test/java/org/neo4j/driver/ParametersTest.java
index 2f64b0d667..a74e4d3400 100644
--- a/driver/src/test/java/org/neo4j/driver/ParametersTest.java
+++ b/driver/src/test/java/org/neo4j/driver/ParametersTest.java
@@ -42,6 +42,7 @@
import static org.mockito.Mockito.mock;
import static org.neo4j.driver.Values.parameters;
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
+import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.util.ValueFactory.emptyNodeValue;
import static org.neo4j.driver.internal.util.ValueFactory.emptyRelationshipValue;
@@ -109,7 +110,7 @@ private Session mockedSession()
ConnectionProvider provider = mock( ConnectionProvider.class );
RetryLogic retryLogic = mock( RetryLogic.class );
NetworkSession session =
- new NetworkSession( provider, retryLogic, defaultDatabase(), AccessMode.WRITE, new DefaultBookmarkHolder(), DEV_NULL_LOGGING );
+ new NetworkSession( provider, retryLogic, defaultDatabase(), AccessMode.WRITE, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE, DEV_NULL_LOGGING );
return new InternalSession( session );
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/SessionConfigTest.java b/driver/src/test/java/org/neo4j/driver/SessionConfigTest.java
index ef2bf931fb..b69e319df4 100644
--- a/driver/src/test/java/org/neo4j/driver/SessionConfigTest.java
+++ b/driver/src/test/java/org/neo4j/driver/SessionConfigTest.java
@@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
@@ -53,6 +54,7 @@ void shouldReturnDefaultValues() throws Throwable
assertEquals( AccessMode.WRITE, config.defaultAccessMode() );
assertFalse( config.database().isPresent() );
assertNull( config.bookmarks() );
+ assertFalse( config.fetchSize().isPresent() );
}
@ParameterizedTest
@@ -145,4 +147,28 @@ void shouldAcceptNullInBookmarks() throws Throwable
SessionConfig config2 = builder().withBookmarks( Arrays.asList( one, two, null ) ).build();
assertThat( config2.bookmarks(), equalTo( Arrays.asList( one, two, null ) ) );
}
+
+ @ParameterizedTest
+ @ValueSource( longs = {100, 1, 1000, Long.MAX_VALUE, -1} )
+ void shouldChangeFetchSize( long value ) throws Throwable
+ {
+ SessionConfig config = builder().withFetchSize( value ).build();
+ assertThat( config.fetchSize(), equalTo( Optional.of( value ) ) );
+ }
+
+ @ParameterizedTest
+ @ValueSource( longs = {0, -100, -2} )
+ void shouldErrorWithIllegalFetchSize( long value ) throws Throwable
+ {
+ assertThrows( IllegalArgumentException.class, () -> builder().withFetchSize( value ).build() );
+ }
+
+ @Test
+ void shouldTwoConfigBeEqual() throws Throwable
+ {
+ SessionConfig config1 = builder().withFetchSize( 100 ).build();
+ SessionConfig config2 = builder().withFetchSize( 100 ).build();
+
+ assertEquals( config1, config2 );
+ }
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java b/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java
index c4e3e5167a..733a5a8e31 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/BookmarkIT.java
@@ -166,7 +166,7 @@ void bookmarkRemainsAfterSuccessfulSessionRun()
Bookmark bookmark = session.lastBookmark();
assertBookmarkContainsSingleValue( bookmark );
- session.run( "RETURN 1" ).consume();
+ session.run( "RETURN 1" ).summary();
assertEquals( bookmark, session.lastBookmark() );
}
@@ -181,7 +181,7 @@ void bookmarkRemainsAfterFailedSessionRun()
Bookmark bookmark = session.lastBookmark();
assertBookmarkContainsSingleValue( bookmark );
- assertThrows( ClientException.class, () -> session.run( "RETURN" ).consume() );
+ assertThrows( ClientException.class, () -> session.run( "RETURN" ).summary() );
assertEquals( bookmark, session.lastBookmark() );
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java
index 41807328ae..448900932b 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java
@@ -116,7 +116,7 @@ void connectionUsedForSessionRunReturnedToThePoolWhenResultConsumed()
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
verify( connection1, never() ).release();
- result.consume();
+ result.summary();
Connection connection2 = connectionPool.lastAcquiredConnectionSpy;
assertSame( connection1, connection2 );
@@ -201,7 +201,7 @@ void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringResultFetc
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
verify( connection1, never() ).release();
- assertThrows( ClientException.class, result::consume );
+ assertThrows( ClientException.class, result::summary );
Connection connection2 = connectionPool.lastAcquiredConnectionSpy;
assertSame( connection1, connection2 );
@@ -219,6 +219,7 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitted()
verify( connection1, never() ).release();
StatementResult result = createNodes( 5, tx );
+ int size = result.list().size();
tx.commit();
tx.close();
@@ -226,7 +227,7 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitted()
assertSame( connection1, connection2 );
verify( connection1 ).release();
- assertEquals( 5, result.list().size() );
+ assertEquals( 5, size );
}
@Test
@@ -240,6 +241,7 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBack()
verify( connection1, never() ).release();
StatementResult result = createNodes( 8, tx );
+ int size = result.list().size();
tx.rollback();
tx.close();
@@ -247,7 +249,7 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBack()
assertSame( connection1, connection2 );
verify( connection1 ).release();
- assertEquals( 8, result.list().size() );
+ assertEquals( 8, size );
}
@Test
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ConnectionPoolIT.java b/driver/src/test/java/org/neo4j/driver/integration/ConnectionPoolIT.java
index 5bfae403b8..4a7bac1cfd 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ConnectionPoolIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ConnectionPoolIT.java
@@ -173,7 +173,7 @@ private static void startAndCloseTransactions( Driver driver, int txCount )
{
for ( StatementResult result : results )
{
- result.consume();
+ result.summary();
}
for ( Transaction tx : transactions )
{
diff --git a/driver/src/test/java/org/neo4j/driver/integration/CredentialsIT.java b/driver/src/test/java/org/neo4j/driver/integration/CredentialsIT.java
index 3684d24132..8a09cddbe9 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/CredentialsIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/CredentialsIT.java
@@ -76,7 +76,7 @@ void shouldBePossibleToChangePassword() throws Exception
try ( Driver driver = GraphDatabase.driver( neo4j.uri(), authToken );
Session session = driver.session() )
{
- session.run( "RETURN 1" ).consume();
+ session.run( "RETURN 1" ).summary();
}
// verify old password does not work
@@ -87,7 +87,7 @@ void shouldBePossibleToChangePassword() throws Exception
try ( Driver driver = GraphDatabase.driver( CredentialsIT.neo4j.uri(), AuthTokens.basic( "neo4j", newPassword ) );
Session session = driver.session() )
{
- session.run( "RETURN 2" ).consume();
+ session.run( "RETURN 2" ).summary();
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ErrorIT.java b/driver/src/test/java/org/neo4j/driver/integration/ErrorIT.java
index a47f6e9d47..af32f970a1 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ErrorIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ErrorIT.java
@@ -81,7 +81,7 @@ void shouldThrowHelpfulSyntaxError()
ClientException e = assertThrows( ClientException.class, () ->
{
StatementResult result = session.run( "invalid statement" );
- result.consume();
+ result.summary();
} );
assertThat( e.getMessage(), startsWith( "Invalid input" ) );
@@ -94,7 +94,7 @@ void shouldNotAllowMoreTxAfterClientException()
Transaction tx = session.beginTransaction();
// And Given an error has occurred
- try { tx.run( "invalid" ).consume(); } catch ( ClientException e ) {/*empty*/}
+ try { tx.run( "invalid" ).summary(); } catch ( ClientException e ) {/*empty*/}
// Expect
ClientException e = assertThrows( ClientException.class, () ->
@@ -109,7 +109,7 @@ void shouldNotAllowMoreTxAfterClientException()
void shouldAllowNewStatementAfterRecoverableError()
{
// Given an error has occurred
- try { session.run( "invalid" ).consume(); } catch ( ClientException e ) {/*empty*/}
+ try { session.run( "invalid" ).summary(); } catch ( ClientException e ) {/*empty*/}
// When
StatementResult cursor = session.run( "RETURN 1" );
@@ -125,7 +125,7 @@ void shouldAllowNewTransactionAfterRecoverableError()
// Given an error has occurred in a prior transaction
try ( Transaction tx = session.beginTransaction() )
{
- tx.run( "invalid" ).consume();
+ tx.run( "invalid" ).summary();
}
catch ( ClientException e ) {/*empty*/}
@@ -241,7 +241,7 @@ void shouldCloseChannelOnInboundFatalFailureMessage() throws InterruptedExceptio
@Test
void shouldThrowErrorWithNiceStackTrace( TestInfo testInfo )
{
- ClientException error = assertThrows( ClientException.class, () -> session.run( "RETURN 10 / 0" ).consume() );
+ ClientException error = assertThrows( ClientException.class, () -> session.run( "RETURN 10 / 0" ).summary() );
// thrown error should have this class & method in the stacktrace
StackTraceElement[] stackTrace = error.getStackTrace();
@@ -273,7 +273,7 @@ private Throwable testChannelErrorHandling( Consumer messa
try
{
- session.run( "RETURN 1" ).consume();
+ session.run( "RETURN 1" ).summary();
fail( "Exception expected" );
}
catch ( Throwable error )
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java
index 6c70ce0c21..d6533963bc 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ExplicitTransactionIT.java
@@ -190,7 +190,7 @@ void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated()
await( session.beginTransactionAsync( TransactionConfig.empty() )
.thenCompose( tx -> tx.runAsync( new Statement( "CREATE (:Node {id: 42})" ), true )
- .thenCompose( StatementResultCursor::consumeAsync )
+ .thenCompose( StatementResultCursor::summaryAsync )
.thenApply( ignore -> tx )
).thenCompose( ExplicitTransaction::commitAsync ) );
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ParametersIT.java b/driver/src/test/java/org/neo4j/driver/integration/ParametersIT.java
index 85be734b78..61357bf6bf 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ParametersIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ParametersIT.java
@@ -495,7 +495,7 @@ private static byte[] randomByteArray( int length )
private static void expectIOExceptionWithMessage( Value value, String message )
{
- ServiceUnavailableException e = assertThrows( ServiceUnavailableException.class, () -> session.run( "RETURN {a}", value ).consume() );
+ ServiceUnavailableException e = assertThrows( ServiceUnavailableException.class, () -> session.run( "RETURN {a}", value ).summary() );
Throwable cause = e.getCause();
assertThat( cause, instanceOf( IOException.class ) );
assertThat( cause.getMessage(), equalTo( message ) );
diff --git a/driver/src/test/java/org/neo4j/driver/integration/ResultStreamIT.java b/driver/src/test/java/org/neo4j/driver/integration/ResultStreamIT.java
index 15e9341727..05b0832cc5 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/ResultStreamIT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/ResultStreamIT.java
@@ -126,7 +126,7 @@ void shouldBeAbleToReuseSessionAfterFailure()
{
// Given
StatementResult res1 = session.run( "INVALID" );
- assertThrows( Exception.class, res1::consume );
+ assertThrows( Exception.class, res1::summary );
// When
StatementResult res2 = session.run( "RETURN 1" );
@@ -144,18 +144,9 @@ void shouldBeAbleToAccessSummaryAfterFailure()
ResultSummary summary;
// When
- try
- {
- res1.consume();
- }
- catch ( Exception e )
- {
- //ignore
- }
- finally
- {
- summary = res1.summary();
- }
+ assertThrows( Exception.class, res1::summary );
+ summary = res1.summary();
+
// Then
assertThat( summary, notNullValue() );
@@ -184,7 +175,7 @@ void shouldBeAbleToAccessSummaryAfterTransactionFailure()
}
@Test
- void shouldBufferRecordsAfterSummary()
+ void shouldNotBufferRecordsAfterSummary()
{
// Given
StatementResult result = session.run("UNWIND [1,2] AS a RETURN a");
@@ -197,8 +188,7 @@ void shouldBufferRecordsAfterSummary()
assertThat( summary.server().address(), equalTo( "localhost:" + session.boltPort() ) );
assertThat( summary.counters().nodesCreated(), equalTo( 0 ) );
- assertThat( result.next().get( "a" ).asInt(), equalTo( 1 ) );
- assertThat( result.next().get( "a" ).asInt(), equalTo( 2 ) );
+ assertFalse( result.hasNext() );
}
@Test
@@ -208,7 +198,7 @@ void shouldDiscardRecordsAfterConsume()
StatementResult result = session.run("UNWIND [1,2] AS a RETURN a");
// When
- ResultSummary summary = result.consume();
+ ResultSummary summary = result.summary();
// Then
assertThat( summary, notNullValue() );
@@ -291,7 +281,7 @@ void shouldConvertEventuallyFailingStatementResultToStream()
assertThat( e.getMessage(), containsString( "/ by zero" ) );
- // stream should manage to consume all elements except the last one, which produces an error
+ // stream should manage to summary all elements except the last one, which produces an error
assertEquals( asList( 1, 1, 1, 1, 1 ), seen );
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java
index 9bf1b8e941..851e4b3de9 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java
@@ -114,8 +114,8 @@ void shouldHandleAcquireReadTransaction() throws IOException, InterruptedExcepti
Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
{
- List result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) )
- .list( record -> record.get( "n.name" ).asString() );
+ List result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" )
+ .list( record -> record.get( "n.name" ).asString() ) );
assertThat( result, equalTo( asList( "Bob", "Alice", "Tina" ) ) );
}
@@ -265,7 +265,7 @@ void shouldThrowSessionExpiredIfWriteServerDisappears() throws IOException, Inte
try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG );
Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ) )
{
- assertThrows( SessionExpiredException.class, () -> session.run( "CREATE (n {name:'Bob'})" ).consume() );
+ assertThrows( SessionExpiredException.class, () -> session.run( "CREATE (n {name:'Bob'})" ).summary() );
}
finally
{
@@ -289,7 +289,7 @@ void shouldThrowSessionExpiredIfWriteServerDisappearsWhenUsingTransaction() thro
Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() );
Transaction tx = session.beginTransaction() )
{
- assertThrows( SessionExpiredException.class, () -> tx.run( "MATCH (n) RETURN n.name" ).consume() );
+ assertThrows( SessionExpiredException.class, () -> tx.run( "MATCH (n) RETURN n.name" ).summary() );
}
finally
{
@@ -446,7 +446,7 @@ void shouldHandleLeaderSwitchWhenWriting() throws IOException, InterruptedExcept
boolean failed = false;
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ) )
{
- session.run( "CREATE ()" ).consume();
+ session.run( "CREATE ()" ).summary();
}
catch ( SessionExpiredException e )
{
@@ -500,7 +500,7 @@ void shouldHandleLeaderSwitchWhenWritingInTransaction() throws IOException, Inte
boolean failed = false;
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); Transaction tx = session.beginTransaction() )
{
- tx.run( "CREATE ()" ).consume();
+ tx.run( "CREATE ()" ).summary();
}
catch ( SessionExpiredException e )
{
@@ -902,7 +902,7 @@ void shouldServeReadsButFailWritesWhenNoWritersAvailable() throws Exception
{
assertEquals( asList( "Bob", "Alice", "Tina" ), readStrings( "MATCH (n) RETURN n.name", session ) );
- assertThrows( SessionExpiredException.class, () -> session.run( "CREATE (n {name:'Bob'})" ).consume() );
+ assertThrows( SessionExpiredException.class, () -> session.run( "CREATE (n {name:'Bob'})" ).summary() );
}
finally
{
@@ -960,12 +960,12 @@ void shouldTreatRoutingTableWithSingleRouterAsValid() throws Exception
// read multiple times without additional rediscovery
StatementResult readResult1 = session.run( "MATCH (n) RETURN n.name" );
- assertEquals( "127.0.0.1:9003", readResult1.summary().server().address() );
assertEquals( 3, readResult1.list().size() );
+ assertEquals( "127.0.0.1:9003", readResult1.summary().server().address() );
StatementResult readResult2 = session.run( "MATCH (n) RETURN n.name" );
- assertEquals( "127.0.0.1:9004", readResult2.summary().server().address() );
assertEquals( 3, readResult2.list().size() );
+ assertEquals( "127.0.0.1:9004", readResult2.summary().server().address() );
}
finally
{
@@ -1076,11 +1076,13 @@ void shouldUseResolverDuringRediscoveryWhenExistingRoutersFail() throws Exceptio
try ( Session session = driver.session() )
{
// run first query against 9001, which should return result and exit
- List names1 = session.run( "MATCH (n) RETURN n.name AS name" ).list( record -> record.get( "name" ).asString() );
+ List names1 = session.run( "MATCH (n) RETURN n.name AS name" )
+ .list( record -> record.get( "name" ).asString() );
assertEquals( asList( "Alice", "Bob", "Eve" ), names1 );
// run second query with retries, it should rediscover using 9042 returned by the resolver and read from 9005
- List names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) ).list( record -> record.get( 0 ).asString() );
+ List names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" )
+ .list( record -> record.get( 0 ).asString() ) );
assertEquals( asList( "Bob", "Alice", "Tina" ), names2 );
}
}
@@ -1140,7 +1142,7 @@ void shouldRevertToInitialRouterIfKnownRouterThrowsProtocolErrors() throws Excep
{
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
{
- List records = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) ).list();
+ List records = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ).list() );
assertEquals( 3, records.size() );
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java
index 8a3dd872d3..a1802cbbee 100644
--- a/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java
+++ b/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java
@@ -30,7 +30,9 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;
+import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.Transaction;
@@ -38,7 +40,6 @@
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.exceptions.TransientException;
-import org.neo4j.driver.Bookmark;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
@@ -117,21 +118,21 @@ void shouldSetTransactionTimeout()
{
// create a dummy node
Session session = driver.session();
- session.run( "CREATE (:Node)" ).consume();
+ session.run( "CREATE (:Node)" ).summary();
try ( Session otherSession = driver.driver().session() )
{
try ( Transaction otherTx = otherSession.beginTransaction() )
{
// lock dummy node but keep the transaction open
- otherTx.run( "MATCH (n:Node) SET n.prop = 1" ).consume();
+ otherTx.run( "MATCH (n:Node) SET n.prop = 1" ).summary();
assertTimeoutPreemptively( TX_TIMEOUT_TEST_TIMEOUT, () -> {
TransactionConfig config = TransactionConfig.builder().withTimeout( ofMillis( 1 ) ).build();
// run a query in an auto-commit transaction with timeout and try to update the locked dummy node
TransientException error = assertThrows( TransientException.class,
- () -> session.run( "MATCH (n:Node) SET n.prop = 2", config ).consume() );
+ () -> session.run( "MATCH (n:Node) SET n.prop = 2", config ).summary() );
assertThat( error.getMessage(), containsString( "terminated" ) );
} );
}
@@ -143,14 +144,14 @@ void shouldSetTransactionTimeoutAsync()
{
// create a dummy node
AsyncSession asyncSession = driver.asyncSession();
- await( await( asyncSession.runAsync( "CREATE (:Node)" ) ).consumeAsync() );
+ await( await( asyncSession.runAsync( "CREATE (:Node)" ) ).summaryAsync() );
try ( Session otherSession = driver.driver().session() )
{
try ( Transaction otherTx = otherSession.beginTransaction() )
{
// lock dummy node but keep the transaction open
- otherTx.run( "MATCH (n:Node) SET n.prop = 1" ).consume();
+ otherTx.run( "MATCH (n:Node) SET n.prop = 1" ).summary();
assertTimeoutPreemptively( TX_TIMEOUT_TEST_TIMEOUT, () -> {
TransactionConfig config = TransactionConfig.builder()
@@ -159,7 +160,7 @@ void shouldSetTransactionTimeoutAsync()
// run a query in an auto-commit transaction with timeout and try to update the locked dummy node
CompletionStage resultFuture = asyncSession.runAsync( "MATCH (n:Node) SET n.prop = 2", config )
- .thenCompose( StatementResultCursor::consumeAsync );
+ .thenCompose( StatementResultCursor::summaryAsync );
TransientException error = assertThrows( TransientException.class, () -> await( resultFuture ) );
@@ -199,18 +200,18 @@ void shouldUseBookmarksForAutoCommitTransactions()
Session session = driver.session();
Bookmark initialBookmark = session.lastBookmark();
- session.run( "CREATE ()" ).consume();
+ session.run( "CREATE ()" ).summary();
Bookmark bookmark1 = session.lastBookmark();
assertNotNull( bookmark1 );
assertNotEquals( initialBookmark, bookmark1 );
- session.run( "CREATE ()" ).consume();
+ session.run( "CREATE ()" ).summary();
Bookmark bookmark2 = session.lastBookmark();
assertNotNull( bookmark2 );
assertNotEquals( initialBookmark, bookmark2 );
assertNotEquals( bookmark1, bookmark2 );
- session.run( "CREATE ()" ).consume();
+ session.run( "CREATE ()" ).summary();
Bookmark bookmark3 = session.lastBookmark();
assertNotNull( bookmark3 );
assertNotEquals( initialBookmark, bookmark3 );
@@ -233,7 +234,7 @@ void shouldUseBookmarksForAutoCommitAndExplicitTransactions()
assertNotNull( bookmark1 );
assertNotEquals( initialBookmark, bookmark1 );
- session.run( "CREATE ()" ).consume();
+ session.run( "CREATE ()" ).summary();
Bookmark bookmark2 = session.lastBookmark();
assertNotNull( bookmark2 );
assertNotEquals( initialBookmark, bookmark2 );
@@ -262,7 +263,7 @@ void shouldUseBookmarksForAutoCommitTransactionsAndTransactionFunctions()
assertNotNull( bookmark1 );
assertNotEquals( initialBookmark, bookmark1 );
- session.run( "CREATE ()" ).consume();
+ session.run( "CREATE ()" ).summary();
Bookmark bookmark2 = session.lastBookmark();
assertNotNull( bookmark2 );
assertNotEquals( initialBookmark, bookmark2 );
@@ -329,12 +330,11 @@ private static void testTransactionMetadataWithAsyncTransactionFunctions( boolea
.build();
// call listTransactions procedure that should list itself with the specified metadata
- CompletionStage cursorFuture =
- read ? asyncSession.readTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ), config )
- : asyncSession.writeTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ), config );
+ CompletionStage singleFuture =
+ read ? asyncSession.readTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ).thenCompose( StatementResultCursor::singleAsync ), config )
+ : asyncSession.writeTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ).thenCompose( StatementResultCursor::singleAsync ), config );
- CompletionStage