-
Notifications
You must be signed in to change notification settings - Fork 155
Pulling in batches for async and blocking API with BOLT V4. #637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…going. As a result `StatementResult#consume` is removed as it is the same as `StatementResult#summary` Added tests back.
…ords. Feature left: Nested session runs should buffer all unconsumed records into memory. AutoPull handler does not support auto read depending on local record buffer.
7c2baed
to
fa0f6cc
Compare
…nto memory. This enables nested session runs.
fa0f6cc
to
da8e744
Compare
6e21f34
to
d637500
Compare
…pull messages. When creating the async result, we write a RUN message, followed by a PULL message. The RUN and PULL messages shall be flushed together. If RUN and PULL are flushed separately, the following scenario may happen: C: RUN "RETURN Wrong" {} {mode="r"} S: FAILURE Neo.ClientError.Statement.SyntaxError "Variable `Wrong` not defined (line 1, column 8 (offset: 7)) C: RESET C: PULL {n=1000} S: SUCCESS {} S: FAILURE Neo.ClientError.Request.Invalid "Message 'PULL Map{n -> Long(1000)}' cannot be handled by a session in the READY state."
d637500
to
9014d06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General remark: I like the removal of consume
. It will break things for people, but will eventually be much more clear than the dualism of summary()
and consume()
.
Offering a higher level method like the following
@Override
public ResultSummary run() {
try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
StatementResult result = runnableStatement.runWith(statementRunner);
return result.consume();
}
}
will not be longer possible, right?
And would need to look like this
@Override
public ResultSummary run() {
try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
StatementResult result = runnableStatement.runWith(statementRunner);
while(result.hasNext()) {
result.next();
}
return result.summary();
}
}
or doing it with .list()
and drop the returned values.
Having said that, I would like an overloaded summary
taking in a boolean
to indicate to actually not only exhaust the current batch, but everything.
But than, why removing consume()
?
/** | ||
* Return the result summary. | ||
* | ||
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining | ||
* records into buffer to yield the summary. | ||
* If the records in the result is not fully consumed, then calling this method will exhausts the result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe clarify that exhausting means exhausting the current batch, not the whole possible result set / size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will exhausting the whole result. It is equivalent to Subscription#cancel
.
We currently cannot discard N, we can only discard ALL.
The following code is equivalent to a driver user:
// Code with 1.7 driver
public ResultSummary run() {
try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
StatementResult result = runnableStatement.runWith(statementRunner);
return result.consume();
}
}
// Code with 4.0 driver
public ResultSummary run() {
try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
StatementResult result = runnableStatement.runWith(statementRunner);
return result.summary();
}
}
@@ -164,10 +164,10 @@ private static RuntimeException driverCloseException() | |||
return new IllegalStateException( "This driver instance has already been closed" ); | |||
} | |||
|
|||
public NetworkSession newSession( SessionConfig parameters ) | |||
public NetworkSession newSession( SessionConfig config ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but good catch 👍
@@ -22,5 +22,6 @@ | |||
|
|||
public interface FailableCursor | |||
{ | |||
CompletionStage<Throwable> consumeAsync(); | |||
CompletionStage<Throwable> failureAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed? At least from what I read, you using consumeAsync
now where failureAsync
on the failable cursor has been used before.
I found that the failable cursor is used as interface for the RxStatementResultCursor
as well… Hmm, it feels like that area could need some unification as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The different of these two method:
consumeAsync: Discard all unconsumed records and return if any error for the whole execution and streaming. Used in StatementRunner#close
, where after this boundary, all records are discarded.
failureAsync: Buffer all unconsumed records into memory and return if any error for the whole execution and streaming. Used for nested queries between session#run
s, so that the second run will not discard all previous unconsumed run records.
@@ -194,7 +196,7 @@ public boolean isOpen() | |||
if ( cursor != null ) | |||
{ | |||
// there exists a cursor with potentially unconsumed error, try to extract and propagate it | |||
return cursor.failureAsync(); | |||
return cursor.consumeAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the places I mentioned above…
Wouldn't that change be applicable to other places where failureAsync
is used?
@@ -74,6 +74,6 @@ private static Throwable findFirstFailure( CompletableFuture<Throwable>[] comple | |||
{ | |||
return cursorStage | |||
.exceptionally( cursor -> null ) | |||
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() ); | |||
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
import org.neo4j.driver.summary.ResultSummary; | ||
|
||
public class AsyncStatementResultCursor implements InternalStatementResultCursor | ||
public class AsyncStatementResultCursorImpl implements AsyncStatementResultCursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big 👍
* | onRecord | X | X | yield record ->STREAMING | X | ->CANCELED | | ||
* | onFailure | X | X | ->FAILED | X | ->FAILED | | ||
* | ||
* Currently the error state (marked with X on the table above) might not be enforced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE
is an error state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is Done, then there is no further transition you can go from Done state.
The top row of the table defines what state you are currently in. Then the first column defines what action you can perform.
I know this is a bit shaky. I will add a card to improve this class with a proper state machine as you pointed out.
*/ | ||
void installSummaryConsumer( BiConsumer<ResultSummary,Throwable> summaryConsumer ); | ||
|
||
enum Status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to find the state transitions here.
This provides a simple implementation to pull records in batches.
The idea is that when we start, we prefetch default fetch size (1000) records, once the driver receives
SUCCESS {has_more=true}
, then we automatically request a new batch from the server.A driver user could cancel the streaming at any time by calling
StatementResult#summary
orStatementRunner#close
.Changes to Driver API
StatementResult#summary
will actually behave likeStatementResult#consume
. It cancels streaming if the streaming has not been finished.StatementResult#consume
is removed from public API.StatementResult
cannot be accessed onceStatementRunner#close
is called.Nested queries with
StatementRunner#run
is still supported.