diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java index 233d0a326d..748299b99b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java @@ -40,18 +40,7 @@ import static org.neo4j.driver.internal.messaging.request.DiscardMessage.newDiscardAllMessage; /** - * In this class we have a hidden state machine. - * Here is how it looks like: - * | | DONE | FAILED | STREAMING | READY | CANCELED | - * |--------------------|------|--------|--------------------------------|--------------------|----------------| - * | request | X | X | toRequest++ ->STREAMING | PULL ->STREAMING | X | - * | cancel | X | X | ->CANCELED | DISCARD ->CANCELED | ->CANCELED | - * | onSuccess has_more | X | X | ->READY request if toRequest>0 | X | ->READY cancel | - * | onSuccess | X | X | summary ->DONE | X | summary ->DONE | - * | onRecord | X | X | yield record ->STREAMING | X | ->CANCELED | - * | onFailure | X | X | ->FAILED | X | ->FAILED | - * - * Currently the error state (marked with X on the table above) might not be enforced. + * Provides basic handling of pull responses from sever. The state is managed by {@link State}. */ public class BasicPullResponseHandler implements PullResponseHandler { @@ -61,156 +50,129 @@ public class BasicPullResponseHandler implements PullResponseHandler protected final Connection connection; private final PullResponseCompletionListener completionListener; - private Status status = Status.READY; + private State state; private long toRequest; private BiConsumer recordConsumer = null; - private BiConsumer summaryConsumer = null; + private BiConsumer summaryConsumer = null; - public BasicPullResponseHandler(Query query, RunResponseHandler runResponseHandler, Connection connection, MetadataExtractor metadataExtractor, - PullResponseCompletionListener completionListener ) + public BasicPullResponseHandler( Query query, RunResponseHandler runResponseHandler, + Connection connection, MetadataExtractor metadataExtractor, + PullResponseCompletionListener completionListener ) { - this.query = requireNonNull(query); + this.query = requireNonNull( query ); this.runResponseHandler = requireNonNull( runResponseHandler ); this.metadataExtractor = requireNonNull( metadataExtractor ); this.connection = requireNonNull( connection ); this.completionListener = requireNonNull( completionListener ); + + this.state = State.READY_STATE; } @Override public synchronized void onSuccess( Map metadata ) { assertRecordAndSummaryConsumerInstalled(); - if ( metadata.getOrDefault( "has_more", BooleanValue.FALSE ).asBoolean() ) - { - handleSuccessWithHasMore(); - } - else - { - handleSuccessWithSummary( metadata ); - } + state.onSuccess( this, metadata ); } @Override public synchronized void onFailure( Throwable error ) { assertRecordAndSummaryConsumerInstalled(); - status = Status.FAILED; - completionListener.afterFailure( error ); - - complete( extractResultSummary( emptyMap() ), error ); + state.onFailure( this, error ); } @Override public synchronized void onRecord( Value[] fields ) { assertRecordAndSummaryConsumerInstalled(); - if ( isStreaming() ) - { - Record record = new InternalRecord( runResponseHandler.queryKeys(), fields ); - recordConsumer.accept( record, null ); - } + state.onRecord( this, fields ); } @Override public synchronized void request( long size ) { assertRecordAndSummaryConsumerInstalled(); - if ( isStreamingPaused() ) - { - status = Status.STREAMING; - connection.writeAndFlush( new PullMessage( size, runResponseHandler.queryId() ), this ); - } - else if ( isStreaming() ) - { - addToRequest( size ); - } + state.request( this, size ); } @Override public synchronized void cancel() { assertRecordAndSummaryConsumerInstalled(); - if ( isStreamingPaused() ) - { - status = Status.CANCELED; - // Reactive API does not provide a way to discard N. Only discard all. - connection.writeAndFlush( newDiscardAllMessage( runResponseHandler.queryId() ), this ); - } - else if ( isStreaming() ) - { - status = Status.CANCELED; - } - // no need to change status if it is already done + state.cancel( this ); } - @Override - public synchronized void installSummaryConsumer( BiConsumer summaryConsumer ) + protected void completeWithFailure( Throwable error ) { - if( this.summaryConsumer != null ) - { - throw new IllegalStateException( "Summary consumer already installed." ); - } - this.summaryConsumer = summaryConsumer; + completionListener.afterFailure( error ); + complete( extractResultSummary( emptyMap() ), error ); } - @Override - public synchronized void installRecordConsumer( BiConsumer recordConsumer ) + protected void completeWithSuccess( Map metadata ) { - if( this.recordConsumer != null ) - { - throw new IllegalStateException( "Record consumer already installed." ); - } - this.recordConsumer = recordConsumer; + completionListener.afterSuccess( metadata ); + ResultSummary summary = extractResultSummary( metadata ); + + complete( summary, null ); } - private boolean isStreaming() + protected void successHasMore() { - return status == Status.STREAMING; + if ( toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE ) + { + request( toRequest ); + toRequest = 0; + } + // summary consumer use (null, null) to identify done handling of success with has_more + summaryConsumer.accept( null, null ); } - private boolean isStreamingPaused() + protected void handleRecord( Value[] fields ) { - return status == Status.READY; + Record record = new InternalRecord( runResponseHandler.queryKeys(), fields ); + recordConsumer.accept( record, null ); } - protected boolean isDone() + protected void writePull( long n ) { - return status == Status.SUCCEEDED || status == Status.FAILED; + connection.writeAndFlush( new PullMessage( n, runResponseHandler.queryId() ), this ); } - private void handleSuccessWithSummary( Map metadata ) + protected void discardAll() { - status = Status.SUCCEEDED; - completionListener.afterSuccess( metadata ); - ResultSummary summary = extractResultSummary( metadata ); - - complete( summary, null ); + connection.writeAndFlush( newDiscardAllMessage( runResponseHandler.queryId() ), this ); } - private void handleSuccessWithHasMore() + @Override + public synchronized void installSummaryConsumer( BiConsumer summaryConsumer ) { - if ( this.status == Status.CANCELED ) + if ( this.summaryConsumer != null ) { - this.status = Status.READY; // cancel request accepted. - cancel(); + throw new IllegalStateException( "Summary consumer already installed." ); } - else if ( this.status == Status.STREAMING ) + this.summaryConsumer = summaryConsumer; + } + + @Override + public synchronized void installRecordConsumer( BiConsumer recordConsumer ) + { + if ( this.recordConsumer != null ) { - this.status = Status.READY; - if ( toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE ) - { - request( toRequest ); - toRequest = 0; - } - // summary consumer use (null, null) to identify done handling of success with has_more - summaryConsumer.accept( null, null ); + throw new IllegalStateException( "Record consumer already installed." ); } + this.recordConsumer = recordConsumer; + } + + protected boolean isDone() + { + return state.equals( State.SUCCEEDED_STATE ) || state.equals( State.FAILURE_STATE ); } private ResultSummary extractResultSummary( Map metadata ) { long resultAvailableAfter = runResponseHandler.resultAvailableAfter(); - return metadataExtractor.extractSummary(query, connection, resultAvailableAfter, metadata ); + return metadataExtractor.extractSummary( query, connection, resultAvailableAfter, metadata ); } private void addToRequest( long toAdd ) @@ -239,15 +201,15 @@ private void addToRequest( long toAdd ) private void assertRecordAndSummaryConsumerInstalled() { - if( isDone() ) + if ( isDone() ) { // no need to check if we've finished. return; } - if( recordConsumer == null || summaryConsumer == null ) + if ( recordConsumer == null || summaryConsumer == null ) { - throw new IllegalStateException( format("Access record stream without record consumer and/or summary consumer. " + - "Record consumer=%s, Summary consumer=%s", recordConsumer, summaryConsumer) ); + throw new IllegalStateException( format( "Access record stream without record consumer and/or summary consumer. " + + "Record consumer=%s, Summary consumer=%s", recordConsumer, summaryConsumer ) ); } } @@ -267,13 +229,217 @@ private void dispose() this.summaryConsumer = null; } - protected Status status() + protected State state() { - return this.status; + return state; } - protected void status( Status status ) + protected void state( State state ) { - this.status = status; + this.state = state; + } + + enum State + { + READY_STATE + { + @Override + void onSuccess( BasicPullResponseHandler context, Map metadata ) + { + context.state( SUCCEEDED_STATE ); + context.completeWithSuccess( metadata ); + } + + @Override + void onFailure( BasicPullResponseHandler context, Throwable error ) + { + context.state( FAILURE_STATE ); + context.completeWithFailure( error ); + } + + @Override + void onRecord( BasicPullResponseHandler context, Value[] fields ) + { + context.state( READY_STATE ); + } + + @Override + void request( BasicPullResponseHandler context, long n ) + { + context.state( STREAMING_STATE ); + context.writePull( n ); + } + + @Override + void cancel( BasicPullResponseHandler context ) + { + context.state( CANCELLED_STATE ); + context.discardAll(); + } + }, + STREAMING_STATE + { + @Override + void onSuccess( BasicPullResponseHandler context, Map metadata ) + { + if ( metadata.getOrDefault( "has_more", BooleanValue.FALSE ).asBoolean() ) + { + context.state( READY_STATE ); + context.successHasMore(); + } + else + { + context.state( SUCCEEDED_STATE ); + context.completeWithSuccess( metadata ); + } + } + + @Override + void onFailure( BasicPullResponseHandler context, Throwable error ) + { + context.state( FAILURE_STATE ); + context.completeWithFailure( error ); + } + + @Override + void onRecord( BasicPullResponseHandler context, Value[] fields ) + { + context.state( STREAMING_STATE ); + context.handleRecord( fields ); + } + + @Override + void request( BasicPullResponseHandler context, long n ) + { + context.state( STREAMING_STATE ); + context.addToRequest( n ); + } + + @Override + void cancel( BasicPullResponseHandler context ) + { + context.state( CANCELLED_STATE ); + } + }, + CANCELLED_STATE + { + @Override + void onSuccess( BasicPullResponseHandler context, Map metadata ) + { + if ( metadata.getOrDefault( "has_more", BooleanValue.FALSE ).asBoolean() ) + { + context.state( CANCELLED_STATE ); + context.discardAll(); + } + else + { + context.state( SUCCEEDED_STATE ); + context.completeWithSuccess( metadata ); + } + } + + @Override + void onFailure( BasicPullResponseHandler context, Throwable error ) + { + context.state( FAILURE_STATE ); + context.completeWithFailure( error ); + } + + @Override + void onRecord( BasicPullResponseHandler context, Value[] fields ) + { + context.state( CANCELLED_STATE ); + } + + @Override + void request( BasicPullResponseHandler context, long n ) + { + context.state( CANCELLED_STATE ); + } + + @Override + void cancel( BasicPullResponseHandler context ) + { + context.state( CANCELLED_STATE ); + } + }, + SUCCEEDED_STATE + { + @Override + void onSuccess( BasicPullResponseHandler context, Map metadata ) + { + context.state( SUCCEEDED_STATE ); + context.completeWithSuccess( metadata ); + } + + @Override + void onFailure( BasicPullResponseHandler context, Throwable error ) + { + context.state( FAILURE_STATE ); + context.completeWithFailure( error ); + } + + @Override + void onRecord( BasicPullResponseHandler context, Value[] fields ) + { + context.state( SUCCEEDED_STATE ); + } + + @Override + void request( BasicPullResponseHandler context, long n ) + { + context.state( SUCCEEDED_STATE ); + } + + @Override + void cancel( BasicPullResponseHandler context ) + { + context.state( SUCCEEDED_STATE ); + } + }, + FAILURE_STATE + { + @Override + void onSuccess( BasicPullResponseHandler context, Map metadata ) + { + context.state( SUCCEEDED_STATE ); + context.completeWithSuccess( metadata ); + } + + @Override + void onFailure( BasicPullResponseHandler context, Throwable error ) + { + context.state( FAILURE_STATE ); + context.completeWithFailure( error ); + } + + @Override + void onRecord( BasicPullResponseHandler context, Value[] fields ) + { + context.state( FAILURE_STATE ); + } + + @Override + void request( BasicPullResponseHandler context, long n ) + { + context.state( FAILURE_STATE ); + } + + @Override + void cancel( BasicPullResponseHandler context ) + { + context.state( FAILURE_STATE ); + } + }; + + abstract void onSuccess( BasicPullResponseHandler context, Map metadata ); + + abstract void onFailure( BasicPullResponseHandler context, Throwable error ); + + abstract void onRecord( BasicPullResponseHandler context, Value[] fields ); + + abstract void request( BasicPullResponseHandler context, long n ); + + abstract void cancel( BasicPullResponseHandler context ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java index 5ca4130ab8..26cf3b7bb5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/PullResponseHandler.java @@ -44,12 +44,4 @@ public interface PullResponseHandler extends ResponseHandler, Subscription */ void installSummaryConsumer( BiConsumer summaryConsumer ); - enum Status - { - SUCCEEDED, // successfully completed - FAILED, // failed - CANCELED, // canceled - STREAMING, // streaming records - READY // steaming is paused. ready to accept request or cancel commands from user - } } diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java index caac156406..e283cf1191 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java @@ -41,28 +41,28 @@ import org.neo4j.driver.Config; import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; +import org.neo4j.driver.QueryRunner; import org.neo4j.driver.Record; -import org.neo4j.driver.Session; import org.neo4j.driver.Result; -import org.neo4j.driver.QueryRunner; +import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.exceptions.AuthenticationException; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.ResultConsumedException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.TransientException; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.exceptions.ResultConsumedException; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DisabledOnNeo4jWith; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; -import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxResult; -import org.neo4j.driver.summary.ResultSummary; +import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.summary.QueryType; +import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.util.DatabaseExtension; import org.neo4j.driver.util.ParallelizableIT; import org.neo4j.driver.util.TestUtil; @@ -942,6 +942,7 @@ void shouldNotAllowAccessingRecordsAfterSessionClosed() } @Test + @DisabledOnNeo4jWith( BOLT_V4 ) void shouldAllowToConsumeRecordsSlowlyAndCloseSession() throws InterruptedException { Session session = neo4j.driver().session(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandlerTestBase.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandlerTestBase.java index 01e500b8dc..bdf359da35 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandlerTestBase.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandlerTestBase.java @@ -27,7 +27,6 @@ import java.util.stream.Stream; import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status; import org.neo4j.driver.internal.messaging.request.DiscardMessage; import org.neo4j.driver.internal.messaging.request.PullMessage; import org.neo4j.driver.internal.spi.Connection; @@ -45,25 +44,23 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.CANCELED; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.SUCCEEDED; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.FAILED; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.READY; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.STREAMING; abstract class BasicPullResponseHandlerTestBase { - protected abstract void shouldHandleSuccessWithSummary( Status status ); - protected abstract void shouldHandleFailure( Status status ); + protected abstract void shouldHandleSuccessWithSummary( BasicPullResponseHandler.State state ); + + protected abstract void shouldHandleFailure( BasicPullResponseHandler.State state ); + protected abstract BasicPullResponseHandler newResponseHandlerWithStatus( Connection conn, BiConsumer recordConsumer, - BiConsumer summaryConsumer, Status status ); + BiConsumer summaryConsumer, + BasicPullResponseHandler.State state ); // on success with summary @ParameterizedTest @MethodSource( "allStatus" ) - void shouldSuccessWithSummary( Status status ) throws Throwable + void shouldSuccessWithSummary( BasicPullResponseHandler.State state ) throws Throwable { - shouldHandleSuccessWithSummary( status ); + shouldHandleSuccessWithSummary( state ); } // on success with has_more @@ -72,7 +69,7 @@ void shouldRequestMoreWithHasMore() throws Throwable { // Given a handler in streaming state Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, STREAMING ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.STREAMING_STATE ); // When handler.request( 100 ); // I append a request to ask for more @@ -81,7 +78,7 @@ void shouldRequestMoreWithHasMore() throws Throwable // Then verify( conn ).writeAndFlush( any( PullMessage.class ), eq( handler ) ); - assertThat( handler.status(), equalTo( STREAMING ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.STREAMING_STATE ) ); } @Test @@ -91,7 +88,7 @@ void shouldInformSummaryConsumerSuccessWithHasMore() throws Throwable Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, STREAMING ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, BasicPullResponseHandler.State.STREAMING_STATE ); // When handler.onSuccess( metaWithHasMoreEqualsTrue() ); @@ -100,7 +97,7 @@ void shouldInformSummaryConsumerSuccessWithHasMore() throws Throwable verifyNoMoreInteractions( conn ); verifyNoMoreInteractions( recordConsumer ); verify( summaryConsumer ).accept( null, null ); - assertThat( handler.status(), equalTo( READY ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.READY_STATE ) ); } @Test @@ -108,20 +105,20 @@ void shouldDiscardIfStreamingIsCanceled() throws Throwable { // Given a handler in streaming state Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, CANCELED ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.CANCELLED_STATE ); handler.onSuccess( metaWithHasMoreEqualsTrue() ); // Then verify( conn ).writeAndFlush( any( DiscardMessage.class ), eq( handler ) ); - assertThat( handler.status(), equalTo( CANCELED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.CANCELLED_STATE ) ); } // on failure @ParameterizedTest @MethodSource( "allStatus" ) - void shouldErrorToRecordAndSummaryConsumer( Status status ) throws Throwable + void shouldErrorToRecordAndSummaryConsumer( BasicPullResponseHandler.State state ) throws Throwable { - shouldHandleFailure( status ); + shouldHandleFailure( state ); } // on record @@ -132,7 +129,7 @@ void shouldReportRecordInStreaming() throws Throwable Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, STREAMING ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, BasicPullResponseHandler.State.STREAMING_STATE ); // When handler.onRecord( new Value[0] ); @@ -141,18 +138,18 @@ void shouldReportRecordInStreaming() throws Throwable verify( recordConsumer ).accept( any( Record.class ), eq( null ) ); verifyNoMoreInteractions( summaryConsumer ); verifyNoMoreInteractions( conn ); - assertThat( handler.status(), equalTo( STREAMING ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.STREAMING_STATE ) ); } @ParameterizedTest @MethodSource( "allStatusExceptStreaming" ) - void shouldNotReportRecordWhenNotStreaming( Status status ) throws Throwable + void shouldNotReportRecordWhenNotStreaming( BasicPullResponseHandler.State state ) throws Throwable { // Given a handler in streaming state Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, status ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, state ); // When handler.onRecord( new Value[0] ); @@ -160,7 +157,7 @@ void shouldNotReportRecordWhenNotStreaming( Status status ) throws Throwable // Then verifyNoMoreInteractions( recordConsumer ); verifyNoMoreInteractions( summaryConsumer ); - assertThat( handler.status(), equalTo( status ) ); + assertThat( handler.state(), equalTo( state ) ); } // request @@ -169,13 +166,13 @@ void shouldStayInStreaming() throws Throwable { // Given Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, STREAMING ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.STREAMING_STATE ); // When handler.request( 100 ); // Then - assertThat( handler.status(), equalTo( STREAMING ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.STREAMING_STATE ) ); } @Test @@ -183,14 +180,14 @@ void shouldPullAndSwitchStreamingInReady() throws Throwable { // Given Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, READY ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.READY_STATE ); // When handler.request( 100 ); // Then verify( conn ).writeAndFlush( any( PullMessage.class ), eq( handler ) ); - assertThat( handler.status(), equalTo( STREAMING ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.STREAMING_STATE ) ); } // cancel @@ -199,14 +196,14 @@ void shouldStayInCancel() throws Throwable { // Given Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, CANCELED ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.CANCELLED_STATE ); // When handler.cancel(); // Then verifyNoMoreInteractions( conn ); - assertThat( handler.status(), equalTo( CANCELED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.CANCELLED_STATE ) ); } @Test @@ -214,14 +211,14 @@ void shouldSwitchFromStreamingToCancel() throws Throwable { // Given Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, STREAMING ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.STREAMING_STATE ); // When handler.cancel(); // Then verifyNoMoreInteractions( conn ); - assertThat( handler.status(), equalTo( CANCELED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.CANCELLED_STATE ) ); } @Test @@ -229,14 +226,14 @@ void shouldSwitchFromReadyToCancel() throws Throwable { // Given Connection conn = mockConnection(); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, READY ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, BasicPullResponseHandler.State.READY_STATE ); // When handler.cancel(); // Then verify( conn ).writeAndFlush( any( DiscardMessage.class ), eq( handler ) ); - assertThat( handler.status(), equalTo( CANCELED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.CANCELLED_STATE ) ); } static Connection mockConnection() @@ -247,11 +244,11 @@ static Connection mockConnection() return conn; } - private BasicPullResponseHandler newResponseHandlerWithStatus( Connection conn, Status status ) + private BasicPullResponseHandler newResponseHandlerWithStatus( Connection conn, BasicPullResponseHandler.State state ) { BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); - return newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, status ); + return newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, state ); } private static HashMap metaWithHasMoreEqualsTrue() @@ -261,13 +258,16 @@ private static HashMap metaWithHasMoreEqualsTrue() return meta; } - private static Stream allStatusExceptStreaming() + private static Stream allStatusExceptStreaming() { - return Stream.of( SUCCEEDED, FAILED, CANCELED, READY ); + return Stream.of( BasicPullResponseHandler.State.SUCCEEDED_STATE, BasicPullResponseHandler.State.FAILURE_STATE, + BasicPullResponseHandler.State.CANCELLED_STATE, BasicPullResponseHandler.State.READY_STATE ); } - private static Stream allStatus() + private static Stream allStatus() { - return Stream.of( SUCCEEDED, FAILED, CANCELED, STREAMING, READY ); + return Stream.of( BasicPullResponseHandler.State.SUCCEEDED_STATE, BasicPullResponseHandler.State.FAILURE_STATE, + BasicPullResponseHandler.State.CANCELLED_STATE, BasicPullResponseHandler.State.READY_STATE, + BasicPullResponseHandler.State.STREAMING_STATE ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java index 927fef48ca..61105648c4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java @@ -26,7 +26,6 @@ import org.neo4j.driver.internal.BookmarkHolder; import org.neo4j.driver.internal.handlers.RunResponseHandler; import org.neo4j.driver.internal.handlers.SessionPullResponseCompletionListener; -import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.summary.ResultSummary; @@ -37,19 +36,17 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.FAILED; class SessionPullResponseCompletionListenerTest extends BasicPullResponseHandlerTestBase { - @Override - protected void shouldHandleSuccessWithSummary( Status status ) + protected void shouldHandleSuccessWithSummary( BasicPullResponseHandler.State state ) { // Given Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); BookmarkHolder bookmarkHolder = mock( BookmarkHolder.class ); - PullResponseHandler handler = newSessionResponseHandler( conn, recordConsumer, summaryConsumer, bookmarkHolder, status); + PullResponseHandler handler = newSessionResponseHandler( conn, recordConsumer, summaryConsumer, bookmarkHolder, state ); // When handler.onSuccess( Collections.emptyMap() ); @@ -63,20 +60,20 @@ protected void shouldHandleSuccessWithSummary( Status status ) } @Override - protected void shouldHandleFailure( Status status ) + protected void shouldHandleFailure( BasicPullResponseHandler.State state ) { // Given Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, status ); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, state ); // When RuntimeException error = new RuntimeException( "I am an error" ); handler.onFailure( error ); // Then - assertThat( handler.status(), equalTo( FAILED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.FAILURE_STATE ) ); verify( conn ).release(); verify( recordConsumer ).accept( null, error ); verify( summaryConsumer ).accept( any( ResultSummary.class ), eq( error ) ); @@ -84,14 +81,15 @@ protected void shouldHandleFailure( Status status ) @Override protected BasicPullResponseHandler newResponseHandlerWithStatus( Connection conn, BiConsumer recordConsumer, - BiConsumer summaryConsumer, Status status ) + BiConsumer summaryConsumer, BasicPullResponseHandler.State state ) { BookmarkHolder bookmarkHolder = BookmarkHolder.NO_OP; - return newSessionResponseHandler( conn, recordConsumer, summaryConsumer, bookmarkHolder, status ); + return newSessionResponseHandler( conn, recordConsumer, summaryConsumer, bookmarkHolder, state ); } private static BasicPullResponseHandler newSessionResponseHandler( Connection conn, BiConsumer recordConsumer, - BiConsumer summaryConsumer, BookmarkHolder bookmarkHolder, Status status ) + BiConsumer summaryConsumer, BookmarkHolder bookmarkHolder, + BasicPullResponseHandler.State state ) { RunResponseHandler runHandler = mock( RunResponseHandler.class ); SessionPullResponseCompletionListener listener = new SessionPullResponseCompletionListener( conn, bookmarkHolder ); @@ -101,7 +99,7 @@ private static BasicPullResponseHandler newSessionResponseHandler( Connection co handler.installRecordConsumer( recordConsumer ); handler.installSummaryConsumer( summaryConsumer ); - handler.status( status ); + handler.state( state ); return handler; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java index 341b524b6c..52c217eb8b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java @@ -36,45 +36,43 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.FAILED; -import static org.neo4j.driver.internal.handlers.pulln.PullResponseHandler.Status.SUCCEEDED; public class TransactionPullResponseCompletionListenerTest extends BasicPullResponseHandlerTestBase { @Override - protected void shouldHandleSuccessWithSummary( PullResponseHandler.Status status ) + protected void shouldHandleSuccessWithSummary( BasicPullResponseHandler.State state ) { // Given Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); - BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, status); + BasicPullResponseHandler handler = newResponseHandlerWithStatus( conn, recordConsumer, summaryConsumer, state ); // When handler.onSuccess( Collections.emptyMap() ); // Then - assertThat( handler.status(), equalTo( SUCCEEDED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.SUCCEEDED_STATE )); verify( recordConsumer ).accept( null, null ); verify( summaryConsumer ).accept( any( ResultSummary.class ), eq( null ) ); } @Override - protected void shouldHandleFailure( PullResponseHandler.Status status ) + protected void shouldHandleFailure( BasicPullResponseHandler.State state ) { // Given Connection conn = mockConnection(); BiConsumer recordConsumer = mock( BiConsumer.class ); BiConsumer summaryConsumer = mock( BiConsumer.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - BasicPullResponseHandler handler = newTxResponseHandler( conn, recordConsumer, summaryConsumer, tx, status ); + BasicPullResponseHandler handler = newTxResponseHandler( conn, recordConsumer, summaryConsumer, tx, state ); // When RuntimeException error = new RuntimeException( "I am an error" ); handler.onFailure( error ); // Then - assertThat( handler.status(), equalTo( FAILED ) ); + assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.FAILURE_STATE ) ); verify( tx ).markTerminated(); verify( recordConsumer ).accept( null, error ); verify( summaryConsumer ).accept( any( ResultSummary.class ), eq( error ) ); @@ -82,14 +80,15 @@ protected void shouldHandleFailure( PullResponseHandler.Status status ) @Override protected BasicPullResponseHandler newResponseHandlerWithStatus( Connection conn, BiConsumer recordConsumer, - BiConsumer summaryConsumer, PullResponseHandler.Status status ) + BiConsumer summaryConsumer, BasicPullResponseHandler.State state ) { UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - return newTxResponseHandler( conn, recordConsumer, summaryConsumer, tx, status ); + return newTxResponseHandler( conn, recordConsumer, summaryConsumer, tx, state ); } - private static BasicPullResponseHandler newTxResponseHandler(Connection conn, BiConsumer recordConsumer, - BiConsumer summaryConsumer, UnmanagedTransaction tx, PullResponseHandler.Status status ) + private static BasicPullResponseHandler newTxResponseHandler( Connection conn, BiConsumer recordConsumer, + BiConsumer summaryConsumer, UnmanagedTransaction tx, + BasicPullResponseHandler.State state ) { RunResponseHandler runHandler = mock( RunResponseHandler.class ); TransactionPullResponseCompletionListener listener = new TransactionPullResponseCompletionListener( tx ); @@ -99,7 +98,7 @@ private static BasicPullResponseHandler newTxResponseHandler(Connection conn, Bi handler.installRecordConsumer( recordConsumer ); handler.installSummaryConsumer( summaryConsumer ); - handler.status( status ); + handler.state( state ); return handler; } }