22
22
import java .util .Objects ;
23
23
import java .util .concurrent .CompletableFuture ;
24
24
import java .util .concurrent .CompletionStage ;
25
+ import java .util .concurrent .atomic .AtomicReference ;
25
26
import java .util .function .BiConsumer ;
26
27
27
28
import org .neo4j .driver .Record ;
@@ -42,6 +43,7 @@ public class RxResultCursorImpl implements RxResultCursor
42
43
private final PullResponseHandler pullHandler ;
43
44
private final Throwable runResponseError ;
44
45
private final CompletableFuture <ResultSummary > summaryFuture = new CompletableFuture <>();
46
+ private final AtomicReference <Throwable > lastReportedError = new AtomicReference <>();
45
47
private boolean resultConsumed ;
46
48
private RecordConsumerStatus consumerStatus = NOT_INSTALLED ;
47
49
@@ -81,6 +83,13 @@ public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
81
83
}
82
84
consumerStatus = recordConsumer == DISCARD_RECORD_CONSUMER ?
83
85
DISCARD_INSTALLED : INSTALLED ;
86
+ recordConsumer = recordConsumer .andThen ( ( record , throwable ) ->
87
+ {
88
+ if ( throwable != null )
89
+ {
90
+ lastReportedError .set ( throwable );
91
+ }
92
+ } );
84
93
pullHandler .installRecordConsumer ( recordConsumer );
85
94
assertRunCompletedSuccessfully ();
86
95
}
@@ -105,7 +114,12 @@ public void cancel()
105
114
public CompletionStage <Throwable > discardAllFailureAsync ()
106
115
{
107
116
// calling this method will enforce discarding record stream and finish running cypher query
108
- return summaryAsync ().thenApply ( summary -> (Throwable ) null ).exceptionally ( error -> error );
117
+ return summaryAsync ().thenApply ( summary -> (Throwable ) null ).exceptionally (
118
+ error ->
119
+ {
120
+ Throwable lastReported = lastReportedError .get ();
121
+ return lastReported != null && (lastReported == error || lastReported == error .getCause ()) ? null : error ;
122
+ } );
109
123
}
110
124
111
125
@ Override
@@ -153,6 +167,7 @@ private void installSummaryConsumer()
153
167
{
154
168
// We will only report the error to summary if there is no user record consumer installed
155
169
// When a user record consumer is installed, the error will be reported to record consumer instead.
170
+ lastReportedError .set ( error );
156
171
summaryFuture .completeExceptionally ( error );
157
172
}
158
173
else if ( summary != null )
0 commit comments