Skip to content

Commit 36eb748

Browse files
author
Zhen Li
committed
Adding fetchSize at driver config and session config.
1 parent da8e744 commit 36eb748

37 files changed

+314
-83
lines changed

driver/src/main/java/org/neo4j/driver/Config.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818
*/
1919
package org.neo4j.driver;
2020

21+
import org.reactivestreams.Subscription;
22+
2123
import java.io.File;
2224
import java.net.InetAddress;
2325
import java.util.Objects;
2426
import java.util.concurrent.TimeUnit;
2527
import java.util.logging.Level;
2628

29+
import org.neo4j.driver.async.AsyncSession;
2730
import org.neo4j.driver.exceptions.ServiceUnavailableException;
2831
import org.neo4j.driver.exceptions.SessionExpiredException;
2932
import org.neo4j.driver.exceptions.TransientException;
3033
import org.neo4j.driver.internal.async.pool.PoolSettings;
3134
import org.neo4j.driver.internal.cluster.RoutingSettings;
35+
import org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil;
3236
import org.neo4j.driver.internal.retry.RetrySettings;
3337
import org.neo4j.driver.net.ServerAddressResolver;
38+
import org.neo4j.driver.reactive.RxSession;
3439
import org.neo4j.driver.util.Immutable;
3540
import org.neo4j.driver.util.Resource;
3641

@@ -88,6 +93,7 @@ public class Config
8893

8994
private final int routingFailureLimit;
9095
private final long routingRetryDelayMillis;
96+
private final long fetchSize;
9197
private long routingTablePurgeDelayMillis;
9298

9399
private final int connectionTimeoutMillis;
@@ -114,6 +120,7 @@ private Config( ConfigBuilder builder )
114120
this.routingTablePurgeDelayMillis = builder.routingTablePurgeDelayMillis;
115121
this.retrySettings = builder.retrySettings;
116122
this.resolver = builder.resolver;
123+
this.fetchSize = builder.fetchSize;
117124

118125
this.isMetricsEnabled = builder.isMetricsEnabled;
119126
}
@@ -230,6 +237,11 @@ RetrySettings retrySettings()
230237
return retrySettings;
231238
}
232239

240+
public long fetchSize()
241+
{
242+
return fetchSize;
243+
}
244+
233245
/**
234246
* @return if the metrics is enabled or not on this driver.
235247
*/
@@ -258,7 +270,7 @@ public static class ConfigBuilder
258270
private RetrySettings retrySettings = RetrySettings.DEFAULT;
259271
private ServerAddressResolver resolver;
260272
private boolean isMetricsEnabled = false;
261-
273+
private long fetchSize = FetchSizeUtil.DEFAULT_FETCH_SIZE;
262274

263275
private ConfigBuilder() {}
264276

@@ -566,6 +578,26 @@ public ConfigBuilder withRoutingTablePurgeDelay( long delay, TimeUnit unit )
566578
return this;
567579
}
568580

581+
/**
582+
* Specify how many records to fetch in each batch.
583+
* This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
584+
*
585+
* Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
586+
* This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
587+
* By default, the value is set to {@code 1000}.
588+
* Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
589+
*
590+
* This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
591+
* As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
592+
* @param size the default record fetch size when pulling records in batches using Bolt V4.
593+
* @return this builder
594+
*/
595+
public ConfigBuilder withFetchSize( long size )
596+
{
597+
this.fetchSize = FetchSizeUtil.assertValidFetchSize( size );
598+
return this;
599+
}
600+
569601
/**
570602
* Specify socket connection timeout.
571603
* <p>

driver/src/main/java/org/neo4j/driver/SessionConfig.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver;
2020

21+
import org.reactivestreams.Subscription;
22+
2123
import java.util.Arrays;
2224
import java.util.Objects;
2325
import java.util.Optional;
@@ -26,6 +28,7 @@
2628
import org.neo4j.driver.reactive.RxSession;
2729

2830
import static java.util.Objects.requireNonNull;
31+
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.assertValidFetchSize;
2932

3033
/**
3134
* The session configurations used to configure a session.
@@ -37,12 +40,14 @@ public class SessionConfig
3740
private final Iterable<Bookmark> bookmarks;
3841
private final AccessMode defaultAccessMode;
3942
private final String database;
43+
private final Optional<Long> fetchSize;
4044

4145
private SessionConfig( Builder builder )
4246
{
4347
this.bookmarks = builder.bookmarks;
4448
this.defaultAccessMode = builder.defaultAccessMode;
4549
this.database = builder.database;
50+
this.fetchSize = builder.fetchSize;
4651
}
4752

4853
/**
@@ -109,6 +114,15 @@ public Optional<String> database()
109114
return Optional.ofNullable( database );
110115
}
111116

117+
/**
118+
* This value if set, overrides the default fetch size set on {@link Config#fetchSize()}.
119+
* @return an optional value of fetch size.
120+
*/
121+
public Optional<Long> fetchSize()
122+
{
123+
return fetchSize;
124+
}
125+
112126
@Override
113127
public boolean equals( Object o )
114128
{
@@ -121,7 +135,8 @@ public boolean equals( Object o )
121135
return false;
122136
}
123137
SessionConfig that = (SessionConfig) o;
124-
return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database );
138+
return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database )
139+
&& Objects.equals( fetchSize, that.fetchSize );
125140
}
126141

127142
@Override
@@ -133,14 +148,16 @@ public int hashCode()
133148
@Override
134149
public String toString()
135150
{
136-
return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' + '}';
151+
return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' +
152+
", fetchSize=" + fetchSize + '}';
137153
}
138154

139155
/**
140156
* Builder used to configure {@link SessionConfig} which will be used to create a session.
141157
*/
142158
public static class Builder
143159
{
160+
private Optional<Long> fetchSize = Optional.empty();
144161
private Iterable<Bookmark> bookmarks = null;
145162
private AccessMode defaultAccessMode = AccessMode.WRITE;
146163
private String database = null;
@@ -230,6 +247,27 @@ public Builder withDatabase( String database )
230247
return this;
231248
}
232249

250+
/**
251+
* Specify how many records to fetch in each batch for this session.
252+
* This config will overrides the default value set on {@link Config#fetchSize()}.
253+
* This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
254+
*
255+
* Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
256+
* This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
257+
* By default, the value is set to {@code 1000}.
258+
* Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
259+
*
260+
* This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
261+
* As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
262+
* @param size the default record fetch size when pulling records in batches using Bolt V4.
263+
* @return this builder
264+
*/
265+
public Builder withFetchSize( long size )
266+
{
267+
this.fetchSize = Optional.of( assertValidFetchSize( size ) );
268+
return this;
269+
}
270+
233271
public SessionConfig build()
234272
{
235273
return new SessionConfig( this );

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,10 @@ private static RuntimeException driverCloseException()
164164
return new IllegalStateException( "This driver instance has already been closed" );
165165
}
166166

167-
public NetworkSession newSession( SessionConfig parameters )
167+
public NetworkSession newSession( SessionConfig config )
168168
{
169169
assertOpen();
170-
NetworkSession session = sessionFactory.newInstance( parameters );
170+
NetworkSession session = sessionFactory.newInstance( config );
171171
if ( closed.get() )
172172
{
173173
// session does not immediately acquire connection, it is fine to just throw

driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
public interface SessionFactory
2727
{
28-
NetworkSession newInstance( SessionConfig parameters );
28+
NetworkSession newInstance( SessionConfig sessionConfig );
2929

3030
CompletionStage<Void> verifyConnectivity();
3131

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,28 @@ public class SessionFactoryImpl implements SessionFactory
3636
private final RetryLogic retryLogic;
3737
private final Logging logging;
3838
private final boolean leakedSessionsLoggingEnabled;
39+
private final long defaultFetchSize;
3940

4041
SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
4142
{
4243
this.connectionProvider = connectionProvider;
4344
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
4445
this.retryLogic = retryLogic;
4546
this.logging = config.logging();
47+
this.defaultFetchSize = config.fetchSize();
4648
}
4749

4850
@Override
4951
public NetworkSession newInstance( SessionConfig sessionConfig )
5052
{
5153
BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( InternalBookmark.from( sessionConfig.bookmarks() ) );
5254
return createSession( connectionProvider, retryLogic, parseDatabaseName( sessionConfig ),
53-
sessionConfig.defaultAccessMode(), bookmarkHolder, logging );
55+
sessionConfig.defaultAccessMode(), bookmarkHolder, parseFetchSize( sessionConfig ), logging );
56+
}
57+
58+
private long parseFetchSize( SessionConfig sessionConfig )
59+
{
60+
return sessionConfig.fetchSize().orElse( defaultFetchSize );
5461
}
5562

5663
private DatabaseName parseDatabaseName( SessionConfig sessionConfig )
@@ -85,10 +92,10 @@ public ConnectionProvider getConnectionProvider()
8592
}
8693

8794
private NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
88-
BookmarkHolder bookmarkHolder, Logging logging )
95+
BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
8996
{
9097
return leakedSessionsLoggingEnabled
91-
? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging )
92-
: new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging );
98+
? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging )
99+
: new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging );
93100
}
94101
}

driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,17 @@ private enum State
6262
private final BoltProtocol protocol;
6363
private final BookmarkHolder bookmarkHolder;
6464
private final ResultCursorsHolder resultCursors;
65+
private final long fetchSize;
6566

6667
private volatile State state = State.ACTIVE;
6768

68-
public ExplicitTransaction( Connection connection, BookmarkHolder bookmarkHolder )
69+
public ExplicitTransaction( Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
6970
{
7071
this.connection = connection;
7172
this.protocol = connection.protocol();
7273
this.bookmarkHolder = bookmarkHolder;
7374
this.resultCursors = new ResultCursorsHolder();
75+
this.fetchSize = fetchSize;
7476
}
7577

7678
public CompletionStage<ExplicitTransaction> beginAsync( InternalBookmark initialBookmark, TransactionConfig config )
@@ -140,7 +142,7 @@ public CompletionStage<StatementResultCursor> runAsync( Statement statement, boo
140142
{
141143
ensureCanRunQueries();
142144
CompletionStage<AsyncStatementResultCursor> cursorStage =
143-
protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse ).asyncResult();
145+
protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse, fetchSize ).asyncResult();
144146
resultCursors.add( cursorStage );
145147
return cursorStage.thenApply( cursor -> cursor );
146148
}
@@ -149,7 +151,7 @@ public CompletionStage<RxStatementResultCursor> runRx( Statement statement )
149151
{
150152
ensureCanRunQueries();
151153
CompletionStage<RxStatementResultCursor> cursorStage =
152-
protocol.runInExplicitTransaction( connection, statement, this, false ).rxResult();
154+
protocol.runInExplicitTransaction( connection, statement, this, false, fetchSize ).rxResult();
153155
resultCursors.add( cursorStage );
154156
return cursorStage;
155157
}

driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public class LeakLoggingNetworkSession extends NetworkSession
3333
private final String stackTrace;
3434

3535
public LeakLoggingNetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
36-
BookmarkHolder bookmarkHolder, Logging logging )
36+
BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
3737
{
38-
super( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging );
38+
super( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging );
3939
this.stackTrace = captureStackTrace();
4040
}
4141

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,23 @@ public class NetworkSession
5757
protected final Logger logger;
5858

5959
private final BookmarkHolder bookmarkHolder;
60+
private final long fetchSize;
6061
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedWithNull();
6162
private volatile CompletionStage<Connection> connectionStage = completedWithNull();
6263
private volatile CompletionStage<? extends FailableCursor> resultCursorStage = completedWithNull();
6364

6465
private final AtomicBoolean open = new AtomicBoolean( true );
6566

6667
public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
67-
BookmarkHolder bookmarkHolder, Logging logging )
68+
BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
6869
{
6970
this.connectionProvider = connectionProvider;
7071
this.mode = mode;
7172
this.retryLogic = retryLogic;
7273
this.logger = new PrefixedLogger( "[" + hashCode() + "]", logging.getLog( LOG_NAME ) );
7374
this.bookmarkHolder = bookmarkHolder;
7475
this.connectionContext = new NetworkSessionConnectionContext( databaseName, bookmarkHolder.getBookmark() );
76+
this.fetchSize = fetchSize;
7577
}
7678

7779
public CompletionStage<StatementResultCursor> runAsync( Statement statement, TransactionConfig config, boolean waitForRunResponse )
@@ -106,7 +108,7 @@ public CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mo
106108
.thenCompose( ignore -> acquireConnection( mode ) )
107109
.thenCompose( connection ->
108110
{
109-
ExplicitTransaction tx = new ExplicitTransaction( connection, bookmarkHolder );
111+
ExplicitTransaction tx = new ExplicitTransaction( connection, bookmarkHolder, fetchSize );
110112
return tx.beginAsync( bookmarkHolder.getBookmark(), config );
111113
} );
112114

@@ -231,7 +233,7 @@ private CompletionStage<StatementResultCursorFactory> buildResultCursorFactory(
231233
try
232234
{
233235
StatementResultCursorFactory factory = connection.protocol()
234-
.runInAutoCommitTransaction( connection, statement, bookmarkHolder, config, waitForRunResponse );
236+
.runInAutoCommitTransaction( connection, statement, bookmarkHolder, config, waitForRunResponse, fetchSize );
235237
return completedFuture( factory );
236238
}
237239
catch ( Throwable e )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import static org.neo4j.driver.Values.parameters;
4141
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
42+
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
4243

4344
public class RoutingProcedureRunner
4445
{
@@ -86,7 +87,7 @@ BookmarkHolder bookmarkHolder( InternalBookmark ignored )
8687
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure, BookmarkHolder bookmarkHolder )
8788
{
8889
return connection.protocol()
89-
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true )
90+
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
9091
.asyncResult().thenCompose( StatementResultCursor::listAsync );
9192
}
9293

driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ public static PullAllResponseHandler newBoltV3PullAllHandler( Statement statemen
4747
}
4848

4949
public static PullAllResponseHandler newBoltV4AutoPullHandler( Statement statement, RunResponseHandler runHandler, Connection connection,
50-
BookmarkHolder bookmarkHolder, ExplicitTransaction tx )
50+
BookmarkHolder bookmarkHolder, ExplicitTransaction tx, long fetchSize )
5151
{
5252
PullResponseCompletionListener completionListener = createPullResponseCompletionListener( connection, bookmarkHolder, tx );
5353

54-
return new AutoPullResponseHandler( statement, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener );
54+
return new AutoPullResponseHandler( statement, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener, fetchSize );
5555
}
5656

5757

0 commit comments

Comments
 (0)