|
19 | 19 | package org.neo4j.driver.internal.cursor;
|
20 | 20 |
|
21 | 21 | import org.junit.jupiter.api.Test;
|
| 22 | +import org.mockito.ArgumentCaptor; |
22 | 23 |
|
23 | 24 | import java.util.Collections;
|
24 | 25 | import java.util.List;
|
25 | 26 | import java.util.concurrent.CompletableFuture;
|
| 27 | +import java.util.concurrent.CompletionStage; |
| 28 | +import java.util.concurrent.ExecutionException; |
26 | 29 | import java.util.function.BiConsumer;
|
27 | 30 |
|
28 | 31 | import org.neo4j.driver.exceptions.ResultConsumedException;
|
29 | 32 | import org.neo4j.driver.internal.handlers.RunResponseHandler;
|
30 | 33 | import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
|
31 | 34 | import org.neo4j.driver.internal.reactive.util.ListBasedPullHandler;
|
32 | 35 | import org.neo4j.driver.internal.spi.Connection;
|
| 36 | +import org.neo4j.driver.summary.ResultSummary; |
33 | 37 |
|
34 | 38 | import static java.util.Arrays.asList;
|
35 | 39 | import static junit.framework.TestCase.assertTrue;
|
36 |
| -import static org.junit.Assert.assertFalse; |
37 | 40 | import static org.junit.jupiter.api.Assertions.assertEquals;
|
| 41 | +import static org.junit.jupiter.api.Assertions.assertFalse; |
| 42 | +import static org.junit.jupiter.api.Assertions.assertNull; |
| 43 | +import static org.junit.jupiter.api.Assertions.assertSame; |
38 | 44 | import static org.junit.jupiter.api.Assertions.assertThrows;
|
39 | 45 | import static org.mockito.ArgumentMatchers.any;
|
40 | 46 | import static org.mockito.Mockito.mock;
|
| 47 | +import static org.mockito.Mockito.times; |
41 | 48 | import static org.mockito.Mockito.verify;
|
42 | 49 | import static org.mockito.Mockito.verifyNoMoreInteractions;
|
43 | 50 | import static org.neo4j.driver.Values.value;
|
@@ -251,6 +258,55 @@ void shouldCancelIfNotPulled()
|
251 | 258 | assertFalse( cursor.isDone() );
|
252 | 259 | }
|
253 | 260 |
|
| 261 | + @Test |
| 262 | + void shouldPropagateSummaryErrorViaSummaryStageWhenItIsRetrievedExternally() throws ExecutionException, InterruptedException |
| 263 | + { |
| 264 | + // Given |
| 265 | + RunResponseHandler runHandler = mock( RunResponseHandler.class ); |
| 266 | + PullResponseHandler pullHandler = mock( PullResponseHandler.class ); |
| 267 | + @SuppressWarnings( "unchecked" ) |
| 268 | + ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class ); |
| 269 | + RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler ); |
| 270 | + verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() ); |
| 271 | + BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue(); |
| 272 | + RuntimeException exception = mock( RuntimeException.class ); |
| 273 | + |
| 274 | + // When |
| 275 | + CompletionStage<ResultSummary> summaryStage = cursor.summaryAsync(); |
| 276 | + CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync(); |
| 277 | + summaryConsumer.accept( null, exception ); |
| 278 | + |
| 279 | + // Then |
| 280 | + verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER ); |
| 281 | + verify( pullHandler ).cancel(); |
| 282 | + ExecutionException actualException = assertThrows( ExecutionException.class, () -> summaryStage.toCompletableFuture().get() ); |
| 283 | + assertSame( exception, actualException.getCause() ); |
| 284 | + assertNull( discardStage.toCompletableFuture().get() ); |
| 285 | + } |
| 286 | + |
| 287 | + @Test |
| 288 | + void shouldPropagateSummaryErrorViaDiscardStageWhenSummaryStageIsNotRetrievedExternally() throws ExecutionException, InterruptedException |
| 289 | + { |
| 290 | + // Given |
| 291 | + RunResponseHandler runHandler = mock( RunResponseHandler.class ); |
| 292 | + PullResponseHandler pullHandler = mock( PullResponseHandler.class ); |
| 293 | + @SuppressWarnings( "unchecked" ) |
| 294 | + ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class ); |
| 295 | + RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler ); |
| 296 | + verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() ); |
| 297 | + BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue(); |
| 298 | + RuntimeException exception = mock( RuntimeException.class ); |
| 299 | + |
| 300 | + // When |
| 301 | + CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync(); |
| 302 | + summaryConsumer.accept( null, exception ); |
| 303 | + |
| 304 | + // Then |
| 305 | + verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER ); |
| 306 | + verify( pullHandler ).cancel(); |
| 307 | + assertSame( exception, discardStage.toCompletableFuture().get().getCause() ); |
| 308 | + } |
| 309 | + |
254 | 310 | private static RunResponseHandler newRunResponseHandler( CompletableFuture<Void> runFuture )
|
255 | 311 | {
|
256 | 312 | return new RunResponseHandler( runFuture, METADATA_EXTRACTOR, mock( Connection.class ), null );
|
|
0 commit comments