Skip to content

Commit 8d5c196

Browse files
authored
Merge pull request #637 from zhenlineo/4.0-aync-pull-n
Back pressure support for async and blocking API with BOLT V4.
2 parents b083dfd + 9014d06 commit 8d5c196

File tree

102 files changed

+2187
-1708
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+2187
-1708
lines changed

driver/src/main/java/org/neo4j/driver/Config.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818
*/
1919
package org.neo4j.driver;
2020

21+
import org.reactivestreams.Subscription;
22+
2123
import java.io.File;
2224
import java.net.InetAddress;
2325
import java.util.Objects;
2426
import java.util.concurrent.TimeUnit;
2527
import java.util.logging.Level;
2628

29+
import org.neo4j.driver.async.AsyncSession;
2730
import org.neo4j.driver.exceptions.ServiceUnavailableException;
2831
import org.neo4j.driver.exceptions.SessionExpiredException;
2932
import org.neo4j.driver.exceptions.TransientException;
3033
import org.neo4j.driver.internal.async.pool.PoolSettings;
3134
import org.neo4j.driver.internal.cluster.RoutingSettings;
35+
import org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil;
3236
import org.neo4j.driver.internal.retry.RetrySettings;
3337
import org.neo4j.driver.net.ServerAddressResolver;
38+
import org.neo4j.driver.reactive.RxSession;
3439
import org.neo4j.driver.util.Immutable;
3540
import org.neo4j.driver.util.Resource;
3641

@@ -88,6 +93,7 @@ public class Config
8893

8994
private final int routingFailureLimit;
9095
private final long routingRetryDelayMillis;
96+
private final long fetchSize;
9197
private long routingTablePurgeDelayMillis;
9298

9399
private final int connectionTimeoutMillis;
@@ -114,6 +120,7 @@ private Config( ConfigBuilder builder )
114120
this.routingTablePurgeDelayMillis = builder.routingTablePurgeDelayMillis;
115121
this.retrySettings = builder.retrySettings;
116122
this.resolver = builder.resolver;
123+
this.fetchSize = builder.fetchSize;
117124

118125
this.isMetricsEnabled = builder.isMetricsEnabled;
119126
}
@@ -230,6 +237,11 @@ RetrySettings retrySettings()
230237
return retrySettings;
231238
}
232239

240+
public long fetchSize()
241+
{
242+
return fetchSize;
243+
}
244+
233245
/**
234246
* @return if the metrics is enabled or not on this driver.
235247
*/
@@ -258,7 +270,7 @@ public static class ConfigBuilder
258270
private RetrySettings retrySettings = RetrySettings.DEFAULT;
259271
private ServerAddressResolver resolver;
260272
private boolean isMetricsEnabled = false;
261-
273+
private long fetchSize = FetchSizeUtil.DEFAULT_FETCH_SIZE;
262274

263275
private ConfigBuilder() {}
264276

@@ -566,6 +578,26 @@ public ConfigBuilder withRoutingTablePurgeDelay( long delay, TimeUnit unit )
566578
return this;
567579
}
568580

581+
/**
582+
* Specify how many records to fetch in each batch.
583+
* This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
584+
*
585+
* Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
586+
* This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
587+
* By default, the value is set to {@code 1000}.
588+
* Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
589+
*
590+
* This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
591+
* As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
592+
* @param size the default record fetch size when pulling records in batches using Bolt V4.
593+
* @return this builder
594+
*/
595+
public ConfigBuilder withFetchSize( long size )
596+
{
597+
this.fetchSize = FetchSizeUtil.assertValidFetchSize( size );
598+
return this;
599+
}
600+
569601
/**
570602
* Specify socket connection timeout.
571603
* <p>

driver/src/main/java/org/neo4j/driver/SessionConfig.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver;
2020

21+
import org.reactivestreams.Subscription;
22+
2123
import java.util.Arrays;
2224
import java.util.Objects;
2325
import java.util.Optional;
@@ -26,6 +28,7 @@
2628
import org.neo4j.driver.reactive.RxSession;
2729

2830
import static java.util.Objects.requireNonNull;
31+
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.assertValidFetchSize;
2932

3033
/**
3134
* The session configurations used to configure a session.
@@ -37,12 +40,14 @@ public class SessionConfig
3740
private final Iterable<Bookmark> bookmarks;
3841
private final AccessMode defaultAccessMode;
3942
private final String database;
43+
private final Optional<Long> fetchSize;
4044

4145
private SessionConfig( Builder builder )
4246
{
4347
this.bookmarks = builder.bookmarks;
4448
this.defaultAccessMode = builder.defaultAccessMode;
4549
this.database = builder.database;
50+
this.fetchSize = builder.fetchSize;
4651
}
4752

4853
/**
@@ -109,6 +114,15 @@ public Optional<String> database()
109114
return Optional.ofNullable( database );
110115
}
111116

117+
/**
118+
* This value if set, overrides the default fetch size set on {@link Config#fetchSize()}.
119+
* @return an optional value of fetch size.
120+
*/
121+
public Optional<Long> fetchSize()
122+
{
123+
return fetchSize;
124+
}
125+
112126
@Override
113127
public boolean equals( Object o )
114128
{
@@ -121,7 +135,8 @@ public boolean equals( Object o )
121135
return false;
122136
}
123137
SessionConfig that = (SessionConfig) o;
124-
return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database );
138+
return Objects.equals( bookmarks, that.bookmarks ) && defaultAccessMode == that.defaultAccessMode && Objects.equals( database, that.database )
139+
&& Objects.equals( fetchSize, that.fetchSize );
125140
}
126141

127142
@Override
@@ -133,14 +148,16 @@ public int hashCode()
133148
@Override
134149
public String toString()
135150
{
136-
return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' + '}';
151+
return "SessionParameters{" + "bookmarks=" + bookmarks + ", defaultAccessMode=" + defaultAccessMode + ", database='" + database + '\'' +
152+
", fetchSize=" + fetchSize + '}';
137153
}
138154

139155
/**
140156
* Builder used to configure {@link SessionConfig} which will be used to create a session.
141157
*/
142158
public static class Builder
143159
{
160+
private Optional<Long> fetchSize = Optional.empty();
144161
private Iterable<Bookmark> bookmarks = null;
145162
private AccessMode defaultAccessMode = AccessMode.WRITE;
146163
private String database = null;
@@ -230,6 +247,27 @@ public Builder withDatabase( String database )
230247
return this;
231248
}
232249

250+
/**
251+
* Specify how many records to fetch in each batch for this session.
252+
* This config will overrides the default value set on {@link Config#fetchSize()}.
253+
* This config is only valid when the driver is used with servers that support Bolt V4 (Server version 4.0 and later).
254+
*
255+
* Bolt V4 enables pulling records in batches to allow client to take control of data population and apply back pressure to server.
256+
* This config specifies the default fetch size for all query runs using {@link Session} and {@link AsyncSession}.
257+
* By default, the value is set to {@code 1000}.
258+
* Use {@code -1} to disables back pressure and config client to pull all records at once after each run.
259+
*
260+
* This config only applies to run result obtained via {@link Session} and {@link AsyncSession}.
261+
* As with {@link RxSession}, the batch size is provided via {@link Subscription#request(long)} instead.
262+
* @param size the default record fetch size when pulling records in batches using Bolt V4.
263+
* @return this builder
264+
*/
265+
public Builder withFetchSize( long size )
266+
{
267+
this.fetchSize = Optional.of( assertValidFetchSize( size ) );
268+
return this;
269+
}
270+
233271
public SessionConfig build()
234272
{
235273
return new SessionConfig( this );

driver/src/main/java/org/neo4j/driver/StatementResult.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020

2121
import java.util.Iterator;
2222
import java.util.List;
23+
import java.util.function.Function;
2324
import java.util.stream.Stream;
2425

2526
import org.neo4j.driver.exceptions.NoSuchRecordException;
2627
import org.neo4j.driver.summary.ResultSummary;
27-
import java.util.function.Function;
2828
import org.neo4j.driver.util.Resource;
2929

3030

@@ -138,29 +138,12 @@ public interface StatementResult extends Iterator<Record>
138138
*/
139139
<T> List<T> list( Function<Record, T> mapFunction );
140140

141-
/**
142-
* Consume the entire result, yielding a summary of it.
143-
*
144-
* Calling this method exhausts the result.
145-
*
146-
* <pre class="doctest:ResultDocIT#summarizeUsage">
147-
* {@code
148-
* ResultSummary summary = session.run( "PROFILE MATCH (n:User {id: 12345}) RETURN n" ).consume();
149-
* }
150-
* </pre>
151-
*
152-
* @return a summary for the whole query result
153-
*/
154-
ResultSummary consume();
155-
156141
/**
157142
* Return the result summary.
158143
*
159-
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
160-
* records into buffer to yield the summary.
144+
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
161145
*
162-
* If you want to obtain the summary but discard the records, use
163-
* {@link StatementResult#consume()} instead.
146+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
164147
*
165148
* @return a summary for the whole query result.
166149
*/

driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.neo4j.driver.Record;
2929
import org.neo4j.driver.Records;
30+
import org.neo4j.driver.StatementResult;
3031
import org.neo4j.driver.exceptions.NoSuchRecordException;
3132
import org.neo4j.driver.summary.ResultSummary;
3233

@@ -72,10 +73,9 @@ public interface StatementResultCursor
7273
/**
7374
* Asynchronously retrieve the result summary.
7475
* <p>
75-
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
76-
* records into buffer to yield the summary.
76+
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
7777
* <p>
78-
* If you want to obtain the summary but discard the records, use {@link #consumeAsync()} instead.
78+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
7979
*
8080
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
8181
* completed exceptionally if query execution fails.
@@ -110,14 +110,6 @@ public interface StatementResultCursor
110110
*/
111111
CompletionStage<Record> singleAsync();
112112

113-
/**
114-
* Asynchronously consume the entire result, yielding a summary of it. Calling this method exhausts the result.
115-
*
116-
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
117-
* completed exceptionally if query execution fails.
118-
*/
119-
CompletionStage<ResultSummary> consumeAsync();
120-
121113
/**
122114
* Asynchronously apply the given {@link Consumer action} to every record in the result, yielding a summary of it.
123115
*

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@
2222

2323
public interface FailableCursor
2424
{
25+
CompletionStage<Throwable> consumeAsync();
2526
CompletionStage<Throwable> failureAsync();
2627
}

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,10 @@ private static RuntimeException driverCloseException()
164164
return new IllegalStateException( "This driver instance has already been closed" );
165165
}
166166

167-
public NetworkSession newSession( SessionConfig parameters )
167+
public NetworkSession newSession( SessionConfig config )
168168
{
169169
assertOpen();
170-
NetworkSession session = sessionFactory.newInstance( parameters );
170+
NetworkSession session = sessionFactory.newInstance( config );
171171
if ( closed.get() )
172172
{
173173
// session does not immediately acquire connection, it is fine to just throw

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@
2222
import java.util.Spliterator;
2323
import java.util.Spliterators;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.function.Function;
2526
import java.util.stream.Stream;
2627
import java.util.stream.StreamSupport;
2728

28-
import org.neo4j.driver.internal.spi.Connection;
29-
import org.neo4j.driver.internal.util.Futures;
3029
import org.neo4j.driver.Record;
3130
import org.neo4j.driver.StatementResult;
3231
import org.neo4j.driver.async.StatementResultCursor;
3332
import org.neo4j.driver.exceptions.ClientException;
3433
import org.neo4j.driver.exceptions.NoSuchRecordException;
34+
import org.neo4j.driver.internal.spi.Connection;
35+
import org.neo4j.driver.internal.util.Futures;
3536
import org.neo4j.driver.summary.ResultSummary;
36-
import java.util.function.Function;
3737

3838
public class InternalStatementResult implements StatementResult
3939
{
@@ -111,12 +111,6 @@ public <T> List<T> list( Function<Record, T> mapFunction )
111111
return blockingGet( cursor.listAsync( mapFunction ) );
112112
}
113113

114-
@Override
115-
public ResultSummary consume()
116-
{
117-
return blockingGet( cursor.consumeAsync() );
118-
}
119-
120114
@Override
121115
public ResultSummary summary()
122116
{

driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
public interface SessionFactory
2727
{
28-
NetworkSession newInstance( SessionConfig parameters );
28+
NetworkSession newInstance( SessionConfig sessionConfig );
2929

3030
CompletionStage<Void> verifyConnectivity();
3131

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,28 @@ public class SessionFactoryImpl implements SessionFactory
3636
private final RetryLogic retryLogic;
3737
private final Logging logging;
3838
private final boolean leakedSessionsLoggingEnabled;
39+
private final long defaultFetchSize;
3940

4041
SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
4142
{
4243
this.connectionProvider = connectionProvider;
4344
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
4445
this.retryLogic = retryLogic;
4546
this.logging = config.logging();
47+
this.defaultFetchSize = config.fetchSize();
4648
}
4749

4850
@Override
4951
public NetworkSession newInstance( SessionConfig sessionConfig )
5052
{
5153
BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( InternalBookmark.from( sessionConfig.bookmarks() ) );
5254
return createSession( connectionProvider, retryLogic, parseDatabaseName( sessionConfig ),
53-
sessionConfig.defaultAccessMode(), bookmarkHolder, logging );
55+
sessionConfig.defaultAccessMode(), bookmarkHolder, parseFetchSize( sessionConfig ), logging );
56+
}
57+
58+
private long parseFetchSize( SessionConfig sessionConfig )
59+
{
60+
return sessionConfig.fetchSize().orElse( defaultFetchSize );
5461
}
5562

5663
private DatabaseName parseDatabaseName( SessionConfig sessionConfig )
@@ -85,10 +92,10 @@ public ConnectionProvider getConnectionProvider()
8592
}
8693

8794
private NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
88-
BookmarkHolder bookmarkHolder, Logging logging )
95+
BookmarkHolder bookmarkHolder, long fetchSize, Logging logging )
8996
{
9097
return leakedSessionsLoggingEnabled
91-
? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging )
92-
: new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, logging );
98+
? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging )
99+
: new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, fetchSize, logging );
93100
}
94101
}

0 commit comments

Comments
 (0)