From 4b8d4dd2b39edcf94a92f9284edd34573cbd3183 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Thu, 16 Sep 2021 20:57:08 +0100 Subject: [PATCH] Prevent duplicate summary error reporting on session closure in reactive Reactive driver might emit the same error both on unconsumed result stream disposal and session closure, this update is intended to fix this. --- .../internal/cursor/RxResultCursorImpl.java | 29 ++++++---- .../cursor/RxResultCursorImplTest.java | 58 ++++++++++++++++++- .../backend/messages/requests/StartTest.java | 6 -- 3 files changed, 76 insertions(+), 17 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index ee875cc5b9..5f85128a21 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -37,20 +37,22 @@ public class RxResultCursorImpl implements RxResultCursor { - static final BiConsumer DISCARD_RECORD_CONSUMER = ( record, throwable ) -> {/*do nothing*/}; + static final BiConsumer DISCARD_RECORD_CONSUMER = ( record, throwable ) -> + {/*do nothing*/}; private final RunResponseHandler runHandler; private final PullResponseHandler pullHandler; private final Throwable runResponseError; private final CompletableFuture summaryFuture = new CompletableFuture<>(); + private boolean summaryFutureExposed; private boolean resultConsumed; private RecordConsumerStatus consumerStatus = NOT_INSTALLED; - public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler ) + public RxResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler ) { this( null, runHandler, pullHandler ); } - public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler ) + public RxResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler ) { Objects.requireNonNull( runHandler ); Objects.requireNonNull( pullHandler ); @@ -105,7 +107,8 @@ public void cancel() public CompletionStage discardAllFailureAsync() { // calling this method will enforce discarding record stream and finish running cypher query - return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error ); + return summaryStage().thenApply( summary -> (Throwable) null ) + .exceptionally( throwable -> summaryFutureExposed ? null : throwable ); } @Override @@ -122,6 +125,18 @@ public CompletionStage pullAllFailureAsync() @Override public CompletionStage summaryAsync() + { + summaryFutureExposed = true; + return summaryStage(); + } + + @Override + public boolean isDone() + { + return summaryFuture.isDone(); + } + + public CompletionStage summaryStage() { if ( !isDone() && !resultConsumed ) // the summary is called before record streaming { @@ -132,12 +147,6 @@ public CompletionStage summaryAsync() return this.summaryFuture; } - @Override - public boolean isDone() - { - return summaryFuture.isDone(); - } - private void assertRunCompletedSuccessfully() { if ( runResponseError != null ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java index 6d7d9242fa..cbb17380cf 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java @@ -19,10 +19,13 @@ package org.neo4j.driver.internal.cursor; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import org.neo4j.driver.exceptions.ResultConsumedException; @@ -30,14 +33,18 @@ import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler; import org.neo4j.driver.internal.reactive.util.ListBasedPullHandler; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.summary.ResultSummary; import static java.util.Arrays.asList; import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.neo4j.driver.Values.value; @@ -251,6 +258,55 @@ void shouldCancelIfNotPulled() assertFalse( cursor.isDone() ); } + @Test + void shouldPropagateSummaryErrorViaSummaryStageWhenItIsRetrievedExternally() throws ExecutionException, InterruptedException + { + // Given + RunResponseHandler runHandler = mock( RunResponseHandler.class ); + PullResponseHandler pullHandler = mock( PullResponseHandler.class ); + @SuppressWarnings( "unchecked" ) + ArgumentCaptor> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class ); + RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler ); + verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() ); + BiConsumer summaryConsumer = summaryConsumerCaptor.getValue(); + RuntimeException exception = mock( RuntimeException.class ); + + // When + CompletionStage summaryStage = cursor.summaryAsync(); + CompletionStage discardStage = cursor.discardAllFailureAsync(); + summaryConsumer.accept( null, exception ); + + // Then + verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER ); + verify( pullHandler ).cancel(); + ExecutionException actualException = assertThrows( ExecutionException.class, () -> summaryStage.toCompletableFuture().get() ); + assertSame( exception, actualException.getCause() ); + assertNull( discardStage.toCompletableFuture().get() ); + } + + @Test + void shouldPropagateSummaryErrorViaDiscardStageWhenSummaryStageIsNotRetrievedExternally() throws ExecutionException, InterruptedException + { + // Given + RunResponseHandler runHandler = mock( RunResponseHandler.class ); + PullResponseHandler pullHandler = mock( PullResponseHandler.class ); + @SuppressWarnings( "unchecked" ) + ArgumentCaptor> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class ); + RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler ); + verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() ); + BiConsumer summaryConsumer = summaryConsumerCaptor.getValue(); + RuntimeException exception = mock( RuntimeException.class ); + + // When + CompletionStage discardStage = cursor.discardAllFailureAsync(); + summaryConsumer.accept( null, exception ); + + // Then + verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER ); + verify( pullHandler ).cancel(); + assertSame( exception, discardStage.toCompletableFuture().get().getCause() ); + } + private static RunResponseHandler newRunResponseHandler( CompletableFuture runFuture ) { return new RunResponseHandler( runFuture, METADATA_EXTRACTOR, mock( Connection.class ), null ); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 81a265759f..2323e5cd08 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -51,7 +51,6 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_raises_error_on_session_run$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_raises_error_on_tx(_func)?_run", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_failed_tx_run_allows(_skipping)?_rollback", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" ); skipMessage = "Requires investigation"; REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage ); @@ -65,17 +64,12 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRunParameters\\..*$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_autocommit_transactions_should_support_timeout$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_bad_syntax$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_missing_parameter$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_broken_transaction_should_not_break_session$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_does_not_update_last_bookmark_on_failure$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_interwoven_queries$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_should_not_run_valid_query_in_invalid_tx$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_tx_timeout$", skipMessage ); } private StartTestBody data;