Skip to content

Commit cc70722

Browse files
committed
Prevent duplicate summary error reporting on session closure in reactive
In reactive mode an error may be reported both via result consumtion and session closure when driver discards unconsumed result stream. This update aims to prevent duplication error reporing on session closure if it has been reported during consumption.
1 parent 15a1f52 commit cc70722

File tree

3 files changed

+76
-17
lines changed

3 files changed

+76
-17
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,22 @@
3737

3838
public class RxResultCursorImpl implements RxResultCursor
3939
{
40-
static final BiConsumer<Record,Throwable> DISCARD_RECORD_CONSUMER = ( record, throwable ) -> {/*do nothing*/};
40+
static final BiConsumer<Record,Throwable> DISCARD_RECORD_CONSUMER = ( record, throwable ) ->
41+
{/*do nothing*/};
4142
private final RunResponseHandler runHandler;
4243
private final PullResponseHandler pullHandler;
4344
private final Throwable runResponseError;
4445
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
46+
private boolean summaryFutureExposed;
4547
private boolean resultConsumed;
4648
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;
4749

48-
public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler )
50+
public RxResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
4951
{
5052
this( null, runHandler, pullHandler );
5153
}
5254

53-
public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
55+
public RxResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
5456
{
5557
Objects.requireNonNull( runHandler );
5658
Objects.requireNonNull( pullHandler );
@@ -105,7 +107,8 @@ public void cancel()
105107
public CompletionStage<Throwable> discardAllFailureAsync()
106108
{
107109
// calling this method will enforce discarding record stream and finish running cypher query
108-
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
110+
return summaryStage().thenApply( summary -> (Throwable) null )
111+
.exceptionally( throwable -> summaryFutureExposed ? null : throwable );
109112
}
110113

111114
@Override
@@ -122,6 +125,18 @@ public CompletionStage<Throwable> pullAllFailureAsync()
122125

123126
@Override
124127
public CompletionStage<ResultSummary> summaryAsync()
128+
{
129+
summaryFutureExposed = true;
130+
return summaryStage();
131+
}
132+
133+
@Override
134+
public boolean isDone()
135+
{
136+
return summaryFuture.isDone();
137+
}
138+
139+
public CompletionStage<ResultSummary> summaryStage()
125140
{
126141
if ( !isDone() && !resultConsumed ) // the summary is called before record streaming
127142
{
@@ -132,12 +147,6 @@ public CompletionStage<ResultSummary> summaryAsync()
132147
return this.summaryFuture;
133148
}
134149

135-
@Override
136-
public boolean isDone()
137-
{
138-
return summaryFuture.isDone();
139-
}
140-
141150
private void assertRunCompletedSuccessfully()
142151
{
143152
if ( runResponseError != null )

driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,32 @@
1919
package org.neo4j.driver.internal.cursor;
2020

2121
import org.junit.jupiter.api.Test;
22+
import org.mockito.ArgumentCaptor;
2223

2324
import java.util.Collections;
2425
import java.util.List;
2526
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CompletionStage;
28+
import java.util.concurrent.ExecutionException;
2629
import java.util.function.BiConsumer;
2730

2831
import org.neo4j.driver.exceptions.ResultConsumedException;
2932
import org.neo4j.driver.internal.handlers.RunResponseHandler;
3033
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
3134
import org.neo4j.driver.internal.reactive.util.ListBasedPullHandler;
3235
import org.neo4j.driver.internal.spi.Connection;
36+
import org.neo4j.driver.summary.ResultSummary;
3337

3438
import static java.util.Arrays.asList;
3539
import static junit.framework.TestCase.assertTrue;
36-
import static org.junit.Assert.assertFalse;
3740
import static org.junit.jupiter.api.Assertions.assertEquals;
41+
import static org.junit.jupiter.api.Assertions.assertFalse;
42+
import static org.junit.jupiter.api.Assertions.assertNull;
43+
import static org.junit.jupiter.api.Assertions.assertSame;
3844
import static org.junit.jupiter.api.Assertions.assertThrows;
3945
import static org.mockito.ArgumentMatchers.any;
4046
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.times;
4148
import static org.mockito.Mockito.verify;
4249
import static org.mockito.Mockito.verifyNoMoreInteractions;
4350
import static org.neo4j.driver.Values.value;
@@ -251,6 +258,55 @@ void shouldCancelIfNotPulled()
251258
assertFalse( cursor.isDone() );
252259
}
253260

261+
@Test
262+
void shouldPropagateSummaryErrorViaSummaryStageWhenItIsRetrievedExternally() throws ExecutionException, InterruptedException
263+
{
264+
// Given
265+
RunResponseHandler runHandler = mock( RunResponseHandler.class );
266+
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
267+
@SuppressWarnings( "unchecked" )
268+
ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class );
269+
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
270+
verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() );
271+
BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue();
272+
RuntimeException exception = mock( RuntimeException.class );
273+
274+
// When
275+
CompletionStage<ResultSummary> summaryStage = cursor.summaryAsync();
276+
CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync();
277+
summaryConsumer.accept( null, exception );
278+
279+
// Then
280+
verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER );
281+
verify( pullHandler ).cancel();
282+
ExecutionException actualException = assertThrows( ExecutionException.class, () -> summaryStage.toCompletableFuture().get() );
283+
assertSame( exception, actualException.getCause() );
284+
assertNull( discardStage.toCompletableFuture().get() );
285+
}
286+
287+
@Test
288+
void shouldPropagateSummaryErrorViaDiscardStageWhenSummaryStageIsNotRetrievedExternally() throws ExecutionException, InterruptedException
289+
{
290+
// Given
291+
RunResponseHandler runHandler = mock( RunResponseHandler.class );
292+
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
293+
@SuppressWarnings( "unchecked" )
294+
ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class );
295+
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
296+
verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() );
297+
BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue();
298+
RuntimeException exception = mock( RuntimeException.class );
299+
300+
// When
301+
CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync();
302+
summaryConsumer.accept( null, exception );
303+
304+
// Then
305+
verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER );
306+
verify( pullHandler ).cancel();
307+
assertSame( exception, discardStage.toCompletableFuture().get().getCause() );
308+
}
309+
254310
private static RunResponseHandler newRunResponseHandler( CompletableFuture<Void> runFuture )
255311
{
256312
return new RunResponseHandler( runFuture, METADATA_EXTRACTOR, mock( Connection.class ), null );

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public class StartTest implements TestkitRequest
5151
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_raises_error_on_session_run$", skipMessage );
5252
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_raises_error_on_tx(_func)?_run", skipMessage );
5353
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_failed_tx_run_allows(_skipping)?_rollback", skipMessage );
54-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
5554
skipMessage = "Requires investigation";
5655
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
5756
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
@@ -65,17 +64,12 @@ public class StartTest implements TestkitRequest
6564
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage );
6665
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );
6766
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRunParameters\\..*$", skipMessage );
68-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_autocommit_transactions_should_support_timeout$", skipMessage );
69-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_bad_syntax$", skipMessage );
70-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_missing_parameter$", skipMessage );
7167
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage );
7268
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage );
7369
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage );
7470
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_broken_transaction_should_not_break_session$", skipMessage );
7571
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_does_not_update_last_bookmark_on_failure$", skipMessage );
7672
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_interwoven_queries$", skipMessage );
77-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_should_not_run_valid_query_in_invalid_tx$", skipMessage );
78-
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_tx_timeout$", skipMessage );
7973
}
8074

8175
private StartTestBody data;

0 commit comments

Comments
 (0)