Skip to content

Pulling in batches for async and blocking API with BOLT V4. #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion driver/src/main/java/org/neo4j/driver/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
*/
package org.neo4j.driver;

import org.reactivestreams.Subscription;

import java.io.File;
import java.net.InetAddress;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.net.ServerAddressResolver;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.util.Immutable;
import org.neo4j.driver.util.Resource;

Expand Down Expand Up @@ -88,6 +93,7 @@ public class Config

private final int routingFailureLimit;
private final long routingRetryDelayMillis;
private final long fetchSize;
private long routingTablePurgeDelayMillis;

private final int connectionTimeoutMillis;
Expand All @@ -114,6 +120,7 @@ private Config( ConfigBuilder builder )
this.routingTablePurgeDelayMillis = builder.routingTablePurgeDelayMillis;
this.retrySettings = builder.retrySettings;
this.resolver = builder.resolver;
this.fetchSize = builder.fetchSize;

this.isMetricsEnabled = builder.isMetricsEnabled;
}
Expand Down Expand Up @@ -230,6 +237,11 @@ RetrySettings retrySettings()
return retrySettings;
}

public long fetchSize()
{
return fetchSize;
}

/**
* @return if the metrics is enabled or not on this driver.
*/
Expand Down Expand Up @@ -258,7 +270,7 @@ public static class ConfigBuilder
private RetrySettings retrySettings = RetrySettings.DEFAULT;
private ServerAddressResolver resolver;
private boolean isMetricsEnabled = false;

private long fetchSize = FetchSizeUtil.DEFAULT_FETCH_SIZE;

private ConfigBuilder() {}

Expand Down Expand Up @@ -566,6 +578,26 @@ public ConfigBuilder withRoutingTablePurgeDelay( long delay, TimeUnit unit )
return this;
}

/**
* Specify how many records to fetch in each batch.
* This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
*
* Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
* This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
* By default, the value is set to {@code 1000}.
* Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
*
* This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
* As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
* @param size the default record fetch size when pulling records in batches using Bolt V4.
* @return this builder
*/
public ConfigBuilder withFetchSize( long size )
{
this.fetchSize = FetchSizeUtil.assertValidFetchSize( size );
return this;
}

/**
* Specify socket connection timeout.
* <p>
Expand Down
42 changes: 40 additions & 2 deletions driver/src/main/java/org/neo4j/driver/SessionConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver;

import org.reactivestreams.Subscription;

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -26,6 +28,7 @@
import org.neo4j.driver.reactive.RxSession;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.assertValidFetchSize;

/**
* The session configurations used to configure a session.
Expand All @@ -37,12 +40,14 @@ public class SessionConfig
private final Iterable<Bookmark> bookmarks;
private final AccessMode defaultAccessMode;
private final String database;
private final Optional<Long> fetchSize;

private SessionConfig( Builder builder )
{
this.bookmarks = builder.bookmarks;
this.defaultAccessMode = builder.defaultAccessMode;
this.database = builder.database;
this.fetchSize = builder.fetchSize;
}

/**
Expand Down Expand Up @@ -109,6 +114,15 @@ public Optional<String> database()
return Optional.ofNullable( database );
}

/**
* This value if set, overrides the default fetch size set on {@link Config#fetchSize()}.
* @return an optional value of fetch size.
*/
public Optional<Long> fetchSize()
{
return fetchSize;
}

@Override
public boolean equals( Object o )
{
Expand All @@ -121,7 +135,8 @@ public boolean equals( Object o )
return false;
}
SessionConfig that = (SessionConfig) o;
return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database );
return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database )
&& Objects.equals( fetchSize, that.fetchSize );
}

@Override
Expand All @@ -133,14 +148,16 @@ public int hashCode()
@Override
public String toString()
{
return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' + '}';
return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' +
", fetchSize=" + fetchSize + '}';
}

/**
* Builder used to configure {@link SessionConfig} which will be used to create a session.
*/
public static class Builder
{
private Optional<Long> fetchSize = Optional.empty();
private Iterable<Bookmark> bookmarks = null;
private AccessMode defaultAccessMode = AccessMode.WRITE;
private String database = null;
Expand Down Expand Up @@ -230,6 +247,27 @@ public Builder withDatabase( String database )
return this;
}

/**
* Specify how many records to fetch in each batch for this session.
* This config will overrides the default value set on {@link Config#fetchSize()}.
* This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
*
* Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
* This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
* By default, the value is set to {@code 1000}.
* Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
*
* This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
* As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
* @param size the default record fetch size when pulling records in batches using Bolt V4.
* @return this builder
*/
public Builder withFetchSize( long size )
{
this.fetchSize = Optional.of( assertValidFetchSize( size ) );
return this;
}

public SessionConfig build()
{
return new SessionConfig( this );
Expand Down
23 changes: 3 additions & 20 deletions driver/src/main/java/org/neo4j/driver/StatementResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;

import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.summary.ResultSummary;
import java.util.function.Function;
import org.neo4j.driver.util.Resource;


Expand Down Expand Up @@ -138,29 +138,12 @@ public interface StatementResult extends Iterator<Record>
*/
<T> List<T> list( Function<Record, T> mapFunction );

/**
* Consume the entire result, yielding a summary of it.
*
* Calling this method exhausts the result.
*
* <pre class="doctest:ResultDocIT#summarizeUsage">
* {@code
* ResultSummary summary = session.run( "PROFILE MATCH (n:User {id: 12345}) RETURN n" ).consume();
* }
* </pre>
*
* @return a summary for the whole query result
*/
ResultSummary consume();

/**
* Return the result summary.
*
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
* records into buffer to yield the summary.
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe clarify that exhausting means exhausting the current batch, not the whole possible result set / size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will exhausting the whole result. It is equivalent to Subscription#cancel.

We currently cannot discard N, we can only discard ALL.

The following code is equivalent to a driver user:

// Code with 1.7 driver
public ResultSummary run() {
	try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
		StatementResult result = runnableStatement.runWith(statementRunner);
		return result.consume();
	}
}

// Code with 4.0 driver
public ResultSummary run() {
	try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
		StatementResult result = runnableStatement.runWith(statementRunner);
		return result.summary();
	}
}

*
* If you want to obtain the summary but discard the records, use
* {@link StatementResult#consume()} instead.
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
*
* @return a summary for the whole query result.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.neo4j.driver.Record;
import org.neo4j.driver.Records;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.summary.ResultSummary;

Expand Down Expand Up @@ -72,10 +73,9 @@ public interface StatementResultCursor
/**
* Asynchronously retrieve the result summary.
* <p>
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
* records into buffer to yield the summary.
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
* <p>
* If you want to obtain the summary but discard the records, use {@link #consumeAsync()} instead.
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
*
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
* completed exceptionally if query execution fails.
Expand Down Expand Up @@ -110,14 +110,6 @@ public interface StatementResultCursor
*/
CompletionStage<Record> singleAsync();

/**
* Asynchronously consume the entire result, yielding a summary of it. Calling this method exhausts the result.
*
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
* completed exceptionally if query execution fails.
*/
CompletionStage<ResultSummary> consumeAsync();

/**
* Asynchronously apply the given {@link Consumer action} to every record in the result, yielding a summary of it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@

public interface FailableCursor
{
CompletionStage<Throwable> consumeAsync();
CompletionStage<Throwable> failureAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed? At least from what I read, you using consumeAsync now where failureAsync on the failable cursor has been used before.

I found that the failable cursor is used as interface for the RxStatementResultCursor as well… Hmm, it feels like that area could need some unification as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different of these two method:
consumeAsync: Discard all unconsumed records and return if any error for the whole execution and streaming. Used in StatementRunner#close, where after this boundary, all records are discarded.
failureAsync: Buffer all unconsumed records into memory and return if any error for the whole execution and streaming. Used for nested queries between session#runs, so that the second run will not discard all previous unconsumed run records.

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ private static RuntimeException driverCloseException()
return new IllegalStateException( "This driver instance has already been closed" );
}

public NetworkSession newSession( SessionConfig parameters )
public NetworkSession newSession( SessionConfig config )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but good catch 👍

{
assertOpen();
NetworkSession session = sessionFactory.newInstance( parameters );
NetworkSession session = sessionFactory.newInstance( config );
if ( closed.get() )
{
// session does not immediately acquire connection, it is fine to just throw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.Record;
import org.neo4j.driver.StatementResult;
import org.neo4j.driver.async.StatementResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.summary.ResultSummary;
import java.util.function.Function;

public class InternalStatementResult implements StatementResult
{
Expand Down Expand Up @@ -111,12 +111,6 @@ public <T> List<T> list( Function<Record, T> mapFunction )
return blockingGet( cursor.listAsync( mapFunction ) );
}

@Override
public ResultSummary consume()
{
return blockingGet( cursor.consumeAsync() );
}

@Override
public ResultSummary summary()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public interface SessionFactory
{
NetworkSession newInstance( SessionConfig parameters );
NetworkSession newInstance( SessionConfig sessionConfig );

CompletionStage<Void> verifyConnectivity();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,28 @@ public class SessionFactoryImpl implements SessionFactory
private final RetryLogic retryLogic;
private final Logging logging;
private final boolean leakedSessionsLoggingEnabled;
private final long defaultFetchSize;

SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
{
this.connectionProvider = connectionProvider;
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
this.retryLogic = retryLogic;
this.logging = config.logging();
this.defaultFetchSize = config.fetchSize();
}

@Override
public NetworkSession newInstance( SessionConfig sessionConfig )
{
BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( InternalBookmark.from( sessionConfig.bookmarks() ) );
return createSession( connectionProvider, retryLogic, parseDatabaseName( sessionConfig ),
sessionConfig.defaultAccessMode(), bookmarkHolder, logging );
sessionConfig.defaultAccessMode(), bookmarkHolder, parseFetchSize( sessionConfig ), logging );
}

private long parseFetchSize( SessionConfig sessionConfig )
{
return sessionConfig.fetchSize().orElse( defaultFetchSize );
}

private DatabaseName parseDatabaseName( SessionConfig sessionConfig )
Expand Down Expand Up @@ -85,10 +92,10 @@ public ConnectionProvider getConnectionProvider()
}

private NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
BookmarkHolder bookmarkHolder, Logging logging )
BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
{
return leakedSessionsLoggingEnabled
? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging )
: new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging );
? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging )
: new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging );
}
}
Loading