Skip to content

Pulling in batches for async and blocking API with BOLT V4. #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 15, 2019

Conversation

zhenlineo
Copy link
Contributor

@zhenlineo zhenlineo commented Oct 10, 2019

This provides a simple implementation to pull records in batches.
The idea is that when we start, we prefetch default fetch size (1000) records, once the driver receives SUCCESS {has_more=true}, then we automatically request a new batch from the server.

A driver user could cancel the streaming at any time by calling StatementResult#summary or StatementRunner#close.

Changes to Driver API

  • StatementResult#summary will actually behave like StatementResult#consume. It cancels streaming if the streaming has not been finished.
  • StatementResult#consume is removed from public API.
  • Unconsumed records in StatementResult cannot be accessed once StatementRunner#close is called.

Nested queries with StatementRunner#run is still supported.

Zhen Li added 3 commits October 8, 2019 12:18
…going.

As a result `StatementResult#consume` is removed as it is the same as `StatementResult#summary`
Added tests back.
…ords.

Feature left:
Nested session runs should buffer all unconsumed records into memory.
AutoPull handler does not support auto read depending on local record buffer.
…nto memory.

This enables nested session runs.
@zhenlineo zhenlineo force-pushed the 4.0-aync-pull-n branch 3 times, most recently from 6e21f34 to d637500 Compare October 11, 2019 12:26
…pull messages.

When creating the async result, we write a RUN message, followed by a PULL message.
The RUN and PULL messages shall be flushed together.

If RUN and PULL are flushed separately, the following scenario may happen:

C: RUN "RETURN Wrong" {} {mode="r"}
S: FAILURE Neo.ClientError.Statement.SyntaxError "Variable `Wrong` not defined (line 1, column 8 (offset: 7))
C: RESET
C: PULL {n=1000}
S: SUCCESS {}
S: FAILURE Neo.ClientError.Request.Invalid "Message 'PULL Map{n -> Long(1000)}' cannot be handled by a session in the READY state."
Copy link
Contributor

@michael-simons michael-simons left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General remark: I like the removal of consume. It will break things for people, but will eventually be much more clear than the dualism of summary() and consume().

Offering a higher level method like the following

	@Override
		public ResultSummary run() {

			try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
				StatementResult result = runnableStatement.runWith(statementRunner);
				return result.consume();
			}
		}

will not be longer possible, right?

And would need to look like this

@Override
		public ResultSummary run() {

			try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
				StatementResult result = runnableStatement.runWith(statementRunner);
				while(result.hasNext()) {
					result.next();
				}
				return result.summary();
			}
		}

or doing it with .list() and drop the returned values.

Having said that, I would like an overloaded summary taking in a boolean to indicate to actually not only exhaust the current batch, but everything.
But than, why removing consume()?

/**
* Return the result summary.
*
* If the records in the result is not fully consumed, then calling this method will force to pull all remaining
* records into buffer to yield the summary.
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe clarify that exhausting means exhausting the current batch, not the whole possible result set / size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will exhausting the whole result. It is equivalent to Subscription#cancel.

We currently cannot discard N, we can only discard ALL.

The following code is equivalent to a driver user:

// Code with 1.7 driver
public ResultSummary run() {
	try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
		StatementResult result = runnableStatement.runWith(statementRunner);
		return result.consume();
	}
}

// Code with 4.0 driver
public ResultSummary run() {
	try (AutoCloseableStatementRunner statementRunner = getStatementRunner(this.targetDatabase)) {
		StatementResult result = runnableStatement.runWith(statementRunner);
		return result.summary();
	}
}

@@ -164,10 +164,10 @@ private static RuntimeException driverCloseException()
return new IllegalStateException( "This driver instance has already been closed" );
}

public NetworkSession newSession( SessionConfig parameters )
public NetworkSession newSession( SessionConfig config )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but good catch 👍

@@ -22,5 +22,6 @@

public interface FailableCursor
{
CompletionStage<Throwable> consumeAsync();
CompletionStage<Throwable> failureAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed? At least from what I read, you using consumeAsync now where failureAsync on the failable cursor has been used before.

I found that the failable cursor is used as interface for the RxStatementResultCursor as well… Hmm, it feels like that area could need some unification as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different of these two method:
consumeAsync: Discard all unconsumed records and return if any error for the whole execution and streaming. Used in StatementRunner#close, where after this boundary, all records are discarded.
failureAsync: Buffer all unconsumed records into memory and return if any error for the whole execution and streaming. Used for nested queries between session#runs, so that the second run will not discard all previous unconsumed run records.

@@ -194,7 +196,7 @@ public boolean isOpen()
if ( cursor != null )
{
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
return cursor.failureAsync();
return cursor.consumeAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the places I mentioned above…
Wouldn't that change be applicable to other places where failureAsync is used?

@@ -74,6 +74,6 @@ private static Throwable findFirstFailure( CompletableFuture<Throwable>[] comple
{
return cursorStage
.exceptionally( cursor -> null )
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() );
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

import org.neo4j.driver.summary.ResultSummary;

public class AsyncStatementResultCursor implements InternalStatementResultCursor
public class AsyncStatementResultCursorImpl implements AsyncStatementResultCursor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big 👍

* | onRecord | X | X | yield record ->STREAMING | X | ->CANCELED |
* | onFailure | X | X | ->FAILED | X | ->FAILED |
*
* Currently the error state (marked with X on the table above) might not be enforced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE is an error state?

Copy link
Contributor Author

@zhenlineo zhenlineo Oct 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is Done, then there is no further transition you can go from Done state.

The top row of the table defines what state you are currently in. Then the first column defines what action you can perform.

I know this is a bit shaky. I will add a card to improve this class with a proper state machine as you pointed out.

*/
void installSummaryConsumer( BiConsumer<ResultSummary,Throwable> summaryConsumer );

enum Status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to find the state transitions here.

@zhenlineo zhenlineo merged commit 8d5c196 into neo4j:4.0 Oct 15, 2019
@zhenlineo zhenlineo deleted the 4.0-aync-pull-n branch October 15, 2019 11:31
@zhenlineo zhenlineo changed the title Back pressure support for async and blocking API with BOLT V4. Pulling in batches for async and blocking API with BOLT V4. Nov 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants