Skip to content

Commit d100fca

Browse files
author
Zhen Li
committed
Back pressure support for async and blocking API with BOLT V4.
1 parent b083dfd commit d100fca

35 files changed

+840
-1746
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@
3434

3535
public class AsyncStatementResultCursor implements InternalStatementResultCursor
3636
{
37-
private final RunResponseHandler runResponseHandler;
37+
private final RunResponseHandler runHandler;
3838
private final PullAllResponseHandler pullAllHandler;
3939

40-
public AsyncStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
40+
public AsyncStatementResultCursor( RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
4141
{
42-
this.runResponseHandler = runResponseHandler;
42+
this.runHandler = runHandler;
4343
this.pullAllHandler = pullAllHandler;
4444
}
4545

4646
@Override
4747
public List<String> keys()
4848
{
49-
return runResponseHandler.statementKeys();
49+
return runHandler.statementKeys();
5050
}
5151

5252
@Override
@@ -94,7 +94,7 @@ public CompletionStage<Record> singleAsync()
9494
@Override
9595
public CompletionStage<ResultSummary> consumeAsync()
9696
{
97-
return pullAllHandler.consumeAsync();
97+
return pullAllHandler.summaryAsync();
9898
}
9999

100100
@Override
@@ -120,7 +120,7 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
120120
@Override
121121
public CompletionStage<Throwable> failureAsync()
122122
{
123-
return pullAllHandler.failureAsync();
123+
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
124124
}
125125

126126
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )

driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursorFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import org.neo4j.driver.internal.async.AsyncStatementResultCursor;
2424
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
2525
import org.neo4j.driver.internal.handlers.RunResponseHandler;
26-
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
26+
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
2727
import org.neo4j.driver.internal.messaging.Message;
28-
import org.neo4j.driver.internal.messaging.request.PullMessage;
2928
import org.neo4j.driver.internal.spi.Connection;
3029

3130
import static java.util.Objects.requireNonNull;
@@ -36,12 +35,12 @@ public class InternalStatementResultCursorFactory implements StatementResultCurs
3635
private final RunResponseHandler runHandler;
3736
private final Connection connection;
3837

39-
private final BasicPullResponseHandler pullHandler;
38+
private final PullResponseHandler pullHandler;
4039
private final PullAllResponseHandler pullAllHandler;
4140
private final boolean waitForRunResponse;
4241
private final Message runMessage;
4342

44-
public InternalStatementResultCursorFactory( Connection connection, Message runMessage, RunResponseHandler runHandler, BasicPullResponseHandler pullHandler,
43+
public InternalStatementResultCursorFactory( Connection connection, Message runMessage, RunResponseHandler runHandler, PullResponseHandler pullHandler,
4544
PullAllResponseHandler pullAllHandler, boolean waitForRunResponse )
4645
{
4746
requireNonNull( connection );
@@ -62,7 +61,8 @@ public InternalStatementResultCursorFactory( Connection connection, Message runM
6261
public CompletionStage<InternalStatementResultCursor> asyncResult()
6362
{
6463
// only write and flush messages when async result is wanted.
65-
connection.writeAndFlush( runMessage, runHandler, PullMessage.PULL_ALL, pullAllHandler );
64+
connection.writeAndFlush( runMessage, runHandler );
65+
pullAllHandler.prePull();
6666

6767
if ( waitForRunResponse )
6868
{

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,26 @@
2828

2929
import org.neo4j.driver.internal.FailableCursor;
3030
import org.neo4j.driver.internal.handlers.RunResponseHandler;
31-
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
31+
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
3232
import org.neo4j.driver.Record;
3333
import org.neo4j.driver.summary.ResultSummary;
3434

35-
import static org.neo4j.driver.internal.handlers.pulln.AbstractBasicPullResponseHandler.DISCARD_RECORD_CONSUMER;
35+
import static org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.DISCARD_RECORD_CONSUMER;
3636

3737
public class RxStatementResultCursor implements Subscription, FailableCursor
3838
{
3939
private final RunResponseHandler runHandler;
40-
private final BasicPullResponseHandler pullHandler;
40+
private final PullResponseHandler pullHandler;
4141
private final Throwable runResponseError;
4242
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
43-
private boolean isRecordHandlerInstalled = false;
43+
private BiConsumer<Record,Throwable> recordConsumer;
4444

45-
public RxStatementResultCursor( RunResponseHandler runHandler, BasicPullResponseHandler pullHandler )
45+
public RxStatementResultCursor( RunResponseHandler runHandler, PullResponseHandler pullHandler )
4646
{
4747
this( null, runHandler, pullHandler );
4848
}
4949

50-
public RxStatementResultCursor( Throwable runError, RunResponseHandler runHandler, BasicPullResponseHandler pullHandler )
50+
public RxStatementResultCursor( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
5151
{
5252
Objects.requireNonNull( runHandler );
5353
Objects.requireNonNull( pullHandler );
@@ -66,15 +66,20 @@ public List<String> keys()
6666

6767
public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
6868
{
69-
if ( isRecordHandlerInstalled )
69+
if ( isRecordConsumerInstalled() )
7070
{
7171
return;
7272
}
73-
isRecordHandlerInstalled = true;
74-
pullHandler.installRecordConsumer( recordConsumer );
73+
this.recordConsumer = recordConsumer;
74+
pullHandler.installRecordConsumer( this.recordConsumer );
7575
assertRunCompletedSuccessfully();
7676
}
7777

78+
private boolean isRecordConsumerInstalled()
79+
{
80+
return this.recordConsumer != null;
81+
}
82+
7883
public void request( long n )
7984
{
8085
pullHandler.request( n );
@@ -120,8 +125,10 @@ private void assertRunCompletedSuccessfully()
120125
private void installSummaryConsumer()
121126
{
122127
pullHandler.installSummaryConsumer( ( summary, error ) -> {
123-
if ( error != null )
128+
if ( error != null && recordConsumer == DISCARD_RECORD_CONSUMER )
124129
{
130+
// We will only report the error to summary if there is no user record consumer installed
131+
// When a user record consumer is installed, the error will be reported to record consumer instead.
125132
summaryFuture.completeExceptionally( error );
126133
}
127134
else if ( summary != null )

0 commit comments

Comments
 (0)