Skip to content

Commit 47f709d

Browse files
author
Zhen Li
committed
Made Statement#summary to cancel streaming if streaming is still ongoing.
As a result `StatementResult#consume` is removed as it is the same as `StatementResult#summary` Added tests back.
1 parent d100fca commit 47f709d

File tree

73 files changed

+1772
-587
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1772
-587
lines changed

driver/src/main/java/org/neo4j/driver/StatementResult.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020

2121
import java.util.Iterator;
2222
import java.util.List;
23+
import java.util.function.Function;
2324
import java.util.stream.Stream;
2425

2526
import org.neo4j.driver.exceptions.NoSuchRecordException;
2627
import org.neo4j.driver.summary.ResultSummary;
27-
import java.util.function.Function;
2828
import org.neo4j.driver.util.Resource;
2929

3030

@@ -138,29 +138,13 @@ public interface StatementResult extends Iterator<Record>
138138
*/
139139
<T> List<T> list( Function<Record, T> mapFunction );
140140

141-
/**
142-
* Consume the entire result, yielding a summary of it.
143-
*
144-
* Calling this method exhausts the result.
145-
*
146-
* <pre class="doctest:ResultDocIT#summarizeUsage">
147-
* {@code
148-
* ResultSummary summary = session.run( "PROFILE MATCH (n:User {id: 12345}) RETURN n" ).consume();
149-
* }
150-
* </pre>
151-
*
152-
* @return a summary for the whole query result
153-
*/
154-
ResultSummary consume();
155-
156141
/**
157142
* Return the result summary.
158143
*
159-
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
160-
* records into buffer to yield the summary.
144+
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
161145
*
162-
* If you want to obtain the summary but discard the records, use
163-
* {@link StatementResult#consume()} instead.
146+
* If you want to obtain the summary without discard the records, invoke
147+
* {@link StatementResult#list()} before calling this method to buffer all records into memory.
164148
*
165149
* @return a summary for the whole query result.
166150
*/

driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ public interface StatementResultCursor
7272
/**
7373
* Asynchronously retrieve the result summary.
7474
* <p>
75-
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
76-
* records into buffer to yield the summary.
75+
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
7776
* <p>
78-
* If you want to obtain the summary but discard the records, use {@link #consumeAsync()} instead.
77+
* If you want to obtain the summary without discarding the records, invoke {@link #listAsync()}
78+
* to buffer records into memory before calling this method.
7979
*
8080
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
8181
* completed exceptionally if query execution fails.
@@ -110,14 +110,6 @@ public interface StatementResultCursor
110110
*/
111111
CompletionStage<Record> singleAsync();
112112

113-
/**
114-
* Asynchronously consume the entire result, yielding a summary of it. Calling this method exhausts the result.
115-
*
116-
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
117-
* completed exceptionally if query execution fails.
118-
*/
119-
CompletionStage<ResultSummary> consumeAsync();
120-
121113
/**
122114
* Asynchronously apply the given {@link Consumer action} to every record in the result, yielding a summary of it.
123115
*

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@
2222
import java.util.Spliterator;
2323
import java.util.Spliterators;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.function.Function;
2526
import java.util.stream.Stream;
2627
import java.util.stream.StreamSupport;
2728

28-
import org.neo4j.driver.internal.spi.Connection;
29-
import org.neo4j.driver.internal.util.Futures;
3029
import org.neo4j.driver.Record;
3130
import org.neo4j.driver.StatementResult;
3231
import org.neo4j.driver.async.StatementResultCursor;
3332
import org.neo4j.driver.exceptions.ClientException;
3433
import org.neo4j.driver.exceptions.NoSuchRecordException;
34+
import org.neo4j.driver.internal.spi.Connection;
35+
import org.neo4j.driver.internal.util.Futures;
3536
import org.neo4j.driver.summary.ResultSummary;
36-
import java.util.function.Function;
3737

3838
public class InternalStatementResult implements StatementResult
3939
{
@@ -111,12 +111,6 @@ public <T> List<T> list( Function<Record, T> mapFunction )
111111
return blockingGet( cursor.listAsync( mapFunction ) );
112112
}
113113

114-
@Override
115-
public ResultSummary consume()
116-
{
117-
return blockingGet( cursor.consumeAsync() );
118-
}
119-
120114
@Override
121115
public ResultSummary summary()
122116
{

driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.neo4j.driver.exceptions.ClientException;
3030
import org.neo4j.driver.internal.BookmarkHolder;
3131
import org.neo4j.driver.internal.InternalBookmark;
32-
import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
32+
import org.neo4j.driver.internal.cursor.AsyncStatementResultCursor;
3333
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
3434
import org.neo4j.driver.internal.messaging.BoltProtocol;
3535
import org.neo4j.driver.internal.spi.Connection;
@@ -139,7 +139,7 @@ else if ( state == State.ROLLED_BACK )
139139
public CompletionStage<StatementResultCursor> runAsync( Statement statement, boolean waitForRunResponse )
140140
{
141141
ensureCanRunQueries();
142-
CompletionStage<InternalStatementResultCursor> cursorStage =
142+
CompletionStage<AsyncStatementResultCursor> cursorStage =
143143
protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse ).asyncResult();
144144
resultCursors.add( cursorStage );
145145
return cursorStage.thenApply( cursor -> cursor );

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.neo4j.driver.internal.DatabaseName;
3535
import org.neo4j.driver.internal.FailableCursor;
3636
import org.neo4j.driver.internal.InternalBookmark;
37-
import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
37+
import org.neo4j.driver.internal.cursor.AsyncStatementResultCursor;
3838
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
3939
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
4040
import org.neo4j.driver.internal.logging.PrefixedLogger;
@@ -76,7 +76,7 @@ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLo
7676

7777
public CompletionStage<StatementResultCursor> runAsync( Statement statement, TransactionConfig config, boolean waitForRunResponse )
7878
{
79-
CompletionStage<InternalStatementResultCursor> newResultCursorStage =
79+
CompletionStage<AsyncStatementResultCursor> newResultCursorStage =
8080
buildResultCursorFactory( statement, config, waitForRunResponse ).thenCompose( StatementResultCursorFactory::asyncResult );
8181

8282
resultCursorStage = newResultCursorStage.exceptionally( error -> null );

driver/src/main/java/org/neo4j/driver/internal/cursor/InternalStatementResultCursor.java renamed to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@
2121
import org.neo4j.driver.internal.FailableCursor;
2222
import org.neo4j.driver.async.StatementResultCursor;
2323

24-
public interface InternalStatementResultCursor extends StatementResultCursor, FailableCursor
24+
public interface AsyncStatementResultCursor extends StatementResultCursor, FailableCursor
2525
{
2626
}

driver/src/main/java/org/neo4j/driver/internal/async/AsyncStatementResultCursor.java renamed to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,27 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal.async;
19+
package org.neo4j.driver.internal.cursor;
2020

2121
import java.util.List;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.CompletionStage;
2424
import java.util.function.Consumer;
2525
import java.util.function.Function;
2626

27+
import org.neo4j.driver.Record;
28+
import org.neo4j.driver.exceptions.NoSuchRecordException;
2729
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
2830
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2931
import org.neo4j.driver.internal.util.Futures;
30-
import org.neo4j.driver.internal.cursor.InternalStatementResultCursor;
31-
import org.neo4j.driver.Record;
32-
import org.neo4j.driver.exceptions.NoSuchRecordException;
3332
import org.neo4j.driver.summary.ResultSummary;
3433

35-
public class AsyncStatementResultCursor implements InternalStatementResultCursor
34+
public class AsyncStatementResultCursorImpl implements AsyncStatementResultCursor
3635
{
3736
private final RunResponseHandler runHandler;
3837
private final PullAllResponseHandler pullAllHandler;
3938

40-
public AsyncStatementResultCursor( RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
39+
public AsyncStatementResultCursorImpl( RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
4140
{
4241
this.runHandler = runHandler;
4342
this.pullAllHandler = pullAllHandler;
@@ -91,12 +90,6 @@ public CompletionStage<Record> singleAsync()
9190
} );
9291
}
9392

94-
@Override
95-
public CompletionStage<ResultSummary> consumeAsync()
96-
{
97-
return pullAllHandler.summaryAsync();
98-
}
99-
10093
@Override
10194
public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
10295
{

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java renamed to driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,28 @@
2020

2121
import java.util.concurrent.CompletionStage;
2222

23-
import org.neo4j.driver.internal.async.AsyncStatementResultCursor;
23+
import org.neo4j.driver.exceptions.ClientException;
2424
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
2525
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2626
import org.neo4j.driver.internal.messaging.Message;
2727
import org.neo4j.driver.internal.spi.Connection;
2828
import org.neo4j.driver.internal.util.Futures;
29-
import org.neo4j.driver.exceptions.ClientException;
3029

3130
import static java.util.Objects.requireNonNull;
3231
import static java.util.concurrent.CompletableFuture.completedFuture;
33-
import static org.neo4j.driver.internal.messaging.request.PullAllMessage.PULL_ALL;
3432

3533
/**
3634
* Used by Bolt V1, V2, V3
3735
*/
38-
public class AsyncResultCursorOnlyFactory implements StatementResultCursorFactory
36+
public class AsyncStatementResultCursorOnlyFactory implements StatementResultCursorFactory
3937
{
4038
protected final Connection connection;
4139
protected final Message runMessage;
4240
protected final RunResponseHandler runHandler;
4341
protected final PullAllResponseHandler pullAllHandler;
4442
private final boolean waitForRunResponse;
4543

46-
public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler,
44+
public AsyncStatementResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler,
4745
PullAllResponseHandler pullHandler, boolean waitForRunResponse )
4846
{
4947
requireNonNull( connection );
@@ -59,19 +57,20 @@ public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage,
5957
this.waitForRunResponse = waitForRunResponse;
6058
}
6159

62-
public CompletionStage<InternalStatementResultCursor> asyncResult()
60+
public CompletionStage<AsyncStatementResultCursor> asyncResult()
6361
{
6462
// only write and flush messages when async result is wanted.
65-
connection.writeAndFlush( runMessage, runHandler, PULL_ALL, pullAllHandler );
63+
connection.writeAndFlush( runMessage, runHandler );
64+
pullAllHandler.prePopulateRecords();
6665

6766
if ( waitForRunResponse )
6867
{
6968
// wait for response of RUN before proceeding
70-
return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursor( runHandler, pullAllHandler ) );
69+
return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
7170
}
7271
else
7372
{
74-
return completedFuture( new AsyncStatementResultCursor( runHandler, pullAllHandler ) );
73+
return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
7574
}
7675
}
7776

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java

Lines changed: 6 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -21,129 +21,20 @@
2121
import org.reactivestreams.Subscription;
2222

2323
import java.util.List;
24-
import java.util.Objects;
25-
import java.util.concurrent.CompletableFuture;
2624
import java.util.concurrent.CompletionStage;
2725
import java.util.function.BiConsumer;
2826

29-
import org.neo4j.driver.internal.FailableCursor;
30-
import org.neo4j.driver.internal.handlers.RunResponseHandler;
31-
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
3227
import org.neo4j.driver.Record;
28+
import org.neo4j.driver.internal.FailableCursor;
3329
import org.neo4j.driver.summary.ResultSummary;
3430

35-
import static org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.DISCARD_RECORD_CONSUMER;
36-
37-
public class RxStatementResultCursor implements Subscription, FailableCursor
31+
public interface RxStatementResultCursor extends Subscription, FailableCursor
3832
{
39-
private final RunResponseHandler runHandler;
40-
private final PullResponseHandler pullHandler;
41-
private final Throwable runResponseError;
42-
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
43-
private BiConsumer<Record,Throwable> recordConsumer;
44-
45-
public RxStatementResultCursor( RunResponseHandler runHandler, PullResponseHandler pullHandler )
46-
{
47-
this( null, runHandler, pullHandler );
48-
}
49-
50-
public RxStatementResultCursor( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
51-
{
52-
Objects.requireNonNull( runHandler );
53-
Objects.requireNonNull( pullHandler );
54-
assertRunResponseArrived( runHandler );
55-
56-
this.runResponseError = runError;
57-
this.runHandler = runHandler;
58-
this.pullHandler = pullHandler;
59-
installSummaryConsumer();
60-
}
61-
62-
public List<String> keys()
63-
{
64-
return runHandler.statementKeys();
65-
}
66-
67-
public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
68-
{
69-
if ( isRecordConsumerInstalled() )
70-
{
71-
return;
72-
}
73-
this.recordConsumer = recordConsumer;
74-
pullHandler.installRecordConsumer( this.recordConsumer );
75-
assertRunCompletedSuccessfully();
76-
}
77-
78-
private boolean isRecordConsumerInstalled()
79-
{
80-
return this.recordConsumer != null;
81-
}
82-
83-
public void request( long n )
84-
{
85-
pullHandler.request( n );
86-
}
87-
88-
@Override
89-
public void cancel()
90-
{
91-
pullHandler.cancel();
92-
}
93-
94-
@Override
95-
public CompletionStage<Throwable> failureAsync()
96-
{
97-
// calling this method will enforce discarding record stream and finish running cypher query
98-
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
99-
}
100-
101-
public CompletionStage<ResultSummary> summaryAsync()
102-
{
103-
if ( !isDone() ) // the summary is called before record streaming
104-
{
105-
installRecordConsumer( DISCARD_RECORD_CONSUMER );
106-
cancel();
107-
}
108-
109-
return this.summaryFuture;
110-
}
111-
112-
public boolean isDone()
113-
{
114-
return summaryFuture.isDone();
115-
}
33+
List<String> keys();
11634

117-
private void assertRunCompletedSuccessfully()
118-
{
119-
if ( runResponseError != null )
120-
{
121-
pullHandler.onFailure( runResponseError );
122-
}
123-
}
35+
void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer );
12436

125-
private void installSummaryConsumer()
126-
{
127-
pullHandler.installSummaryConsumer( ( summary, error ) -> {
128-
if ( error != null && recordConsumer == DISCARD_RECORD_CONSUMER )
129-
{
130-
// We will only report the error to summary if there is no user record consumer installed
131-
// When a user record consumer is installed, the error will be reported to record consumer instead.
132-
summaryFuture.completeExceptionally( error );
133-
}
134-
else if ( summary != null )
135-
{
136-
summaryFuture.complete( summary );
137-
}
138-
//else (null, null) to indicate a has_more success
139-
} );
140-
}
37+
CompletionStage<ResultSummary> summaryAsync();
14138

142-
private void assertRunResponseArrived( RunResponseHandler runHandler )
143-
{
144-
if ( !runHandler.runFuture().isDone() )
145-
{
146-
throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
147-
}
148-
}
39+
boolean isDone();
14940
}

0 commit comments

Comments
 (0)